/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.transport;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.cassandra.concurrent.LocalAwareExecutorService;
import org.apache.cassandra.concurrent.SharedExecutorPool;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.ExceptionHandlers;
import org.apache.cassandra.transport.Flusher;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.transport.ServerConnection;
import org.apache.cassandra.transport.messages.ErrorMessage;
import org.apache.cassandra.utils.JVMStabilityInspector;

public class Dispatcher {
    private static final LocalAwareExecutorService requestExecutor = SharedExecutorPool.SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(), DatabaseDescriptor::setNativeTransportMaxThreads, "transport", "Native-Transport-Requests");
    private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new ConcurrentHashMap<EventLoop, Flusher>();
    private final boolean useLegacyFlusher;

    public Dispatcher(boolean useLegacyFlusher) {
        this.useLegacyFlusher = useLegacyFlusher;
    }

    public void dispatch(Channel channel, Message.Request request, FlushItemConverter forFlusher) {
        requestExecutor.submit(() -> this.processRequest(channel, request, forFlusher));
    }

    static Message.Response processRequest(ServerConnection connection, Message.Request request) {
        long queryStartNanoTime = System.nanoTime();
        if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4)) {
            ClientWarn.instance.captureWarnings();
        }
        QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion());
        Message.logger.trace("Received: {}, v={}", (Object)request, (Object)connection.getVersion());
        connection.requests.inc();
        Message.Response response = request.execute(qstate, queryStartNanoTime);
        response.setStreamId(request.getStreamId());
        response.setWarnings(ClientWarn.instance.getWarnings());
        response.attach(connection);
        connection.applyStateTransition(request.type, response.type);
        return response;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void processRequest(Channel channel, Message.Request request, FlushItemConverter forFlusher) {
        Flusher.FlushItem<?> toFlush;
        try {
            assert (request.connection() instanceof ServerConnection);
            ServerConnection connection = (ServerConnection)request.connection();
            Message.Response response = Dispatcher.processRequest(connection, request);
            toFlush = forFlusher.toFlushItem(channel, request, response);
            Message.logger.trace("Responding: {}, v={}", (Object)response, (Object)connection.getVersion());
        }
        catch (Throwable t) {
            JVMStabilityInspector.inspectThrowable(t);
            ExceptionHandlers.UnexpectedChannelExceptionHandler handler = new ExceptionHandlers.UnexpectedChannelExceptionHandler(channel, true);
            ErrorMessage error = ErrorMessage.fromException(t, handler);
            error.setStreamId(request.getStreamId());
            toFlush = forFlusher.toFlushItem(channel, request, error);
        }
        finally {
            ClientWarn.instance.resetWarnings();
        }
        this.flush(toFlush);
    }

    private void flush(Flusher.FlushItem<?> item) {
        Flusher created;
        Flusher alt;
        EventLoop loop = item.channel.eventLoop();
        Flusher flusher = (Flusher)flusherLookup.get(loop);
        if (flusher == null && (alt = flusherLookup.putIfAbsent(loop, flusher = (created = this.useLegacyFlusher ? Flusher.legacy(loop) : Flusher.immediate(loop)))) != null) {
            flusher = alt;
        }
        flusher.enqueue(item);
        flusher.start();
    }

    public static void shutdown() {
        if (requestExecutor != null) {
            requestExecutor.shutdown();
        }
    }

    static interface FlushItemConverter {
        public Flusher.FlushItem<?> toFlushItem(Channel var1, Message.Request var2, Message.Response var3);
    }
}

