package kafka.network;

import java.io.EOFException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import kafka.utils.Time;
import kafka.utils.Utils$;
import scala.Function1;
import scala.Function2;
import scala.Option;
import scala.ScalaObject;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SocketServer.scala */
@ScalaSignature(bytes = "\u0006\u0001}4\u0011\"\u0001\u0002\u0005\u0002\u0003\u0005\t\u0001\u0002\u0004\u0003\u0013A\u0013xnY3tg>\u0014(BA\u0002\u0005\u0003\u001dqW\r^<pe.T\u0011!B\u0001\u0006W\u000647.Y\n\u0004\u0001\u001dY\u0001C\u0001\u0005\n\u001b\u0005\u0011\u0011B\u0001\u0006\u0003\u0005Q\t%m\u001d;sC\u000e$8+\u001a:wKJ$\u0006N]3bIB\u0011AbD\u0007\u0002\u001b)\ta\"A\u0003tG\u0006d\u0017-\u0003\u0002\u0011\u001b\tY1kY1mC>\u0013'.Z2u\u0011!\u0011\u0002A!b\u0001\n\u0003!\u0012A\u00045b]\u0012dWM]'baBLgnZ\u0002\u0001+\u0005)\u0002C\u0001\f\u001a\u001d\tAq#\u0003\u0002\u0019\u0005\u00059\u0001*\u00198eY\u0016\u0014\u0018B\u0001\u000e\u001c\u00059A\u0015M\u001c3mKJl\u0015\r\u001d9j]\u001eT!\u0001\u0007\u0002\t\u0011u\u0001!\u0011!Q\u0001\nU\tq\u0002[1oI2,'/T1qa&tw\r\t\u0005\t?\u0001\u0011)\u0019!C\u0001A\u0005!A/[7f+\u0005\t\u0003C\u0001\u0012&\u001b\u0005\u0019#B\u0001\u0013\u0005\u0003\u0015)H/\u001b7t\u0013\t13E\u0001\u0003US6,\u0007\u0002\u0003\u0015\u0001\u0005\u0003\u0005\u000b\u0011B\u0011\u0002\u000bQLW.\u001a\u0011\t\u0011)\u0002!Q1A\u0005\u0002-\nQa\u001d;biN,\u0012\u0001\f\t\u0003\u00115J!A\f\u0002\u0003#M{7m[3u'\u0016\u0014h/\u001a:Ti\u0006$8\u000f\u0003\u00051\u0001\t\u0005\t\u0015!\u0003-\u0003\u0019\u0019H/\u0019;tA!)!\u0007\u0001C\u0001g\u00051A(\u001b8jiz\"B\u0001N\u001b7oA\u0011\u0001\u0002\u0001\u0005\u0006%E\u0002\r!\u0006\u0005\u0006?E\u0002\r!\t\u0005\u0006UE\u0002\r\u0001\f\u0005\bs\u0001\u0011\r\u0011\"\u0003;\u00039qWm^\"p]:,7\r^5p]N,\u0012a\u000f\t\u0004y\r+U\"A\u001f\u000b\u0005yz\u0014AC2p]\u000e,(O]3oi*\u0011\u0001)Q\u0001\u0005kRLGNC\u0001C\u0003\u0011Q\u0017M^1\n\u0005\u0011k$!F\"p]\u000e,(O]3oi2Kgn[3e#V,W/\u001a\t\u0003\r.k\u0011a\u0012\u0006\u0003\u0011&\u000b\u0001b\u00195b]:,Gn\u001d\u0006\u0003\u0015\u0006\u000b1A\\5p\u0013\tauIA\u0007T_\u000e\\W\r^\"iC:tW\r\u001c\u0005\u0007\u001d\u0002\u0001\u000b\u0011B\u001e\u0002\u001f9,woQ8o]\u0016\u001cG/[8og\u0002BQ\u0001\u0015\u0001\u0005BE\u000b1A];o)\u0005\u0011\u0006C\u0001\u0007T\u0013\t!VB\u0001\u0003V]&$\b\"\u0002,\u0001\t\u00139\u0016!B2m_N,GC\u0001*Y\u0011\u0015IV\u000b1\u0001[\u0003\rYW-\u001f\t\u0003\rnK!\u0001X$\u0003\u0019M+G.Z2uS>t7*Z=\t\u000by\u0003A\u0011A0\u0002\r\u0005\u001c7-\u001a9u)\t\u0011\u0006\rC\u0003b;\u0002\u0007Q)A\u0007t_\u000e\\W\r^\"iC:tW\r\u001c\u0005\u0006G\u0002!I!U\u0001\u0018G>tg-[4ve\u0016tUm^\"p]:,7\r^5p]NDQ!\u001a\u0001\u0005\n\u0019\fa\u0001[1oI2,GcA4n]B\u0019A\u0002\u001b6\n\u0005%l!AB(qi&|g\u000e\u0005\u0002\tW&\u0011AN\u0001\u0002\u0005'\u0016tG\rC\u0003ZI\u0002\u0007!\fC\u0003pI\u0002\u0007\u0001/A\u0004sKF,Xm\u001d;\u0011\u0005!\t\u0018B\u0001:\u0003\u0005\u001d\u0011VmY3jm\u0016DQ\u0001\u001e\u0001\u0005\u0002U\fAA]3bIR\u0011!K\u001e\u0005\u00063N\u0004\rA\u0017\u0005\u0006q\u0002!\t!_\u0001\u0006oJLG/\u001a\u000b\u0003%jDQ!W<A\u0002iCQ\u0001 \u0001\u0005\nu\f!b\u00195b]:,GNR8s)\t)e\u0010C\u0003Zw\u0002\u0007!\f")
/* loaded from: input_file:kafka/network/Processor.class */
public class Processor extends AbstractServerThread implements ScalaObject {
    private final Function2<Short, Receive, Function1<Receive, Option<Send>>> handlerMapping;
    private final Time time;
    private final SocketServerStats stats;
    private final ConcurrentLinkedQueue<SocketChannel> newConnections = new ConcurrentLinkedQueue<>();

    public Function2<Short, Receive, Function1<Receive, Option<Send>>> handlerMapping() {
        return this.handlerMapping;
    }

    public Time time() {
        return this.time;
    }

    public SocketServerStats stats() {
        return this.stats;
    }

    private ConcurrentLinkedQueue<SocketChannel> newConnections() {
        return this.newConnections;
    }

    @Override // java.lang.Runnable
    public void run() {
        startupComplete();
        loop0: while (isRunning()) {
            configureNewConnections();
            if (selector().select(500L) > 0) {
                Iterator<SelectionKey> it = selector().selectedKeys().iterator();
                while (it.hasNext() && isRunning()) {
                    SelectionKey selectionKey = null;
                    try {
                        selectionKey = it.next();
                        it.remove();
                        if (selectionKey.isReadable()) {
                            read(selectionKey);
                        } else if (selectionKey.isWritable()) {
                            write(selectionKey);
                        } else {
                            if (selectionKey.isValid()) {
                                throw new IllegalStateException("Unrecognized key state for processor thread.");
                                break loop0;
                            }
                            close(selectionKey);
                        }
                    } catch (EOFException e) {
                        logger().info(new StringBuilder().append("Closing socket for ").append(channelFor(selectionKey).socket().getInetAddress()).append(".").toString());
                        close(selectionKey);
                    } catch (Throwable th) {
                        logger().info(new StringBuilder().append("Closing socket for ").append(channelFor(selectionKey).socket().getInetAddress()).append(" because of error").toString());
                        logger().error(th, th);
                        close(selectionKey);
                    }
                }
            }
        }
        logger().debug("Closing selector.");
        Utils$.MODULE$.swallow(new Processor$$anonfun$run$6(this), new Processor$$anonfun$run$3(this));
        shutdownComplete();
    }

    private void close(SelectionKey selectionKey) {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        if (logger().isDebugEnabled()) {
            logger().debug(new StringBuilder().append("Closing connection from ").append(socketChannel.socket().getRemoteSocketAddress()).toString());
        }
        Utils$.MODULE$.swallow(new Processor$$anonfun$close$4(this), new Processor$$anonfun$close$1(this, socketChannel));
        Utils$.MODULE$.swallow(new Processor$$anonfun$close$5(this), new Processor$$anonfun$close$2(this, socketChannel));
        selectionKey.attach(null);
        Utils$.MODULE$.swallow(new Processor$$anonfun$close$6(this), new Processor$$anonfun$close$3(this, selectionKey));
    }

    public void accept(SocketChannel socketChannel) {
        newConnections().add(socketChannel);
        selector().wakeup();
    }

    private void configureNewConnections() {
        while (newConnections().size() > 0) {
            SocketChannel poll = newConnections().poll();
            if (logger().isDebugEnabled()) {
                logger().debug(new StringBuilder().append("Listening to new connection from ").append(poll.socket().getRemoteSocketAddress()).toString());
            }
            poll.register(selector(), 1);
        }
    }

    private Option<Send> handle(SelectionKey selectionKey, Receive receive) {
        if (logger().isTraceEnabled()) {
            logger().trace(new StringBuilder().append("Handling request from ").append(channelFor(selectionKey).socket().getRemoteSocketAddress()).toString());
        }
        short s = receive.buffer().getShort();
        Function1 function1 = (Function1) handlerMapping().apply(BoxesRunTime.boxToShort(s), receive);
        if (function1 == null) {
            throw new InvalidRequestException("No handler found for request");
        }
        long nanoseconds = time().nanoseconds();
        Option<Send> option = (Option) function1.apply(receive);
        stats().recordRequest(s, time().nanoseconds() - nanoseconds);
        return option;
    }

    public void read(SelectionKey selectionKey) {
        SocketChannel channelFor = channelFor(selectionKey);
        Receive receive = (Receive) selectionKey.attachment();
        if (selectionKey.attachment() == null) {
            receive = new BoundedByteBufferReceive();
            selectionKey.attach(receive);
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        int readFrom = receive.readFrom(channelFor);
        stats().recordBytesRead(readFrom);
        if (logger().isTraceEnabled()) {
            logger().trace(new StringBuilder().append(readFrom).append(" bytes read from ").append(channelFor.socket().getRemoteSocketAddress()).toString());
        }
        if (readFrom < 0) {
            close(selectionKey);
            return;
        }
        if (!receive.complete()) {
            selectionKey.interestOps(1);
            selector().wakeup();
            return;
        }
        Option<Send> handle = handle(selectionKey, receive);
        selectionKey.attach(null);
        if (handle.isDefined()) {
            selectionKey.attach(handle.getOrElse(new Processor$$anonfun$read$1(this)));
            selectionKey.interestOps(4);
        }
    }

    public void write(SelectionKey selectionKey) {
        Send send = (Send) selectionKey.attachment();
        SocketChannel channelFor = channelFor(selectionKey);
        int writeTo = send.writeTo(channelFor);
        stats().recordBytesWritten(writeTo);
        if (logger().isTraceEnabled()) {
            logger().trace(new StringBuilder().append(writeTo).append(" bytes written to ").append(channelFor.socket().getRemoteSocketAddress()).toString());
        }
        if (send.complete()) {
            selectionKey.attach(null);
            selectionKey.interestOps(1);
        } else {
            selectionKey.interestOps(4);
            selector().wakeup();
        }
    }

    private SocketChannel channelFor(SelectionKey selectionKey) {
        return (SocketChannel) selectionKey.channel();
    }

    public Processor(Function2<Short, Receive, Function1<Receive, Option<Send>>> function2, Time time, SocketServerStats socketServerStats) {
        this.handlerMapping = function2;
        this.time = time;
        this.stats = socketServerStats;
    }
}
