/*
 * Decompiled with CFR 0.152.
 */
package kafka.producer.async;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import kafka.producer.SyncProducer;
import kafka.producer.async.CallbackHandler;
import kafka.producer.async.EventHandler;
import kafka.producer.async.IllegalQueueStateException;
import kafka.producer.async.ProducerSendThread$;
import kafka.producer.async.QueueItem;
import kafka.serializer.Encoder;
import kafka.utils.SystemTime$;
import org.apache.log4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.ScalaObject;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.StringBuilder;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
@ScalaSignature(bytes="\u0006\u0001\u0005-e!B\u0001\u0003\u0001\tA!A\u0005)s_\u0012,8-\u001a:TK:$G\u000b\u001b:fC\u0012T!a\u0001\u0003\u0002\u000b\u0005\u001c\u0018P\\2\u000b\u0005\u00151\u0011\u0001\u00039s_\u0012,8-\u001a:\u000b\u0003\u001d\tQa[1gW\u0006,\"!\u0003\u001c\u0014\u0007\u0001Q!\u0003\u0005\u0002\f!5\tAB\u0003\u0002\u000e\u001d\u0005!A.\u00198h\u0015\u0005y\u0011\u0001\u00026bm\u0006L!!\u0005\u0007\u0003\rQC'/Z1e!\t\u0019b#D\u0001\u0015\u0015\u0005)\u0012!B:dC2\f\u0017BA\f\u0015\u0005-\u00196-\u00197b\u001f\nTWm\u0019;\t\u0011e\u0001!Q1A\u0005\u0002m\t!\u0002\u001e5sK\u0006$g*Y7f\u0007\u0001)\u0012\u0001\b\t\u0003;\u0001r!a\u0005\u0010\n\u0005}!\u0012A\u0002)sK\u0012,g-\u0003\u0002\"E\t11\u000b\u001e:j]\u001eT!a\b\u000b\t\u0011\u0011\u0002!\u0011!Q\u0001\nq\t1\u0002\u001e5sK\u0006$g*Y7fA!Aa\u0005\u0001BC\u0002\u0013\u0005q%A\u0003rk\u0016,X-F\u0001)!\rIc\u0006M\u0007\u0002U)\u00111\u0006L\u0001\u000bG>t7-\u001e:sK:$(BA\u0017\u000f\u0003\u0011)H/\u001b7\n\u0005=R#!\u0004\"m_\u000e\\\u0017N\\4Rk\u0016,X\rE\u00022eQj\u0011AA\u0005\u0003g\t\u0011\u0011\"U;fk\u0016LE/Z7\u0011\u0005U2D\u0002\u0001\u0003\to\u0001!\t\u0011!b\u0001q\t\tA+\u0005\u0002:yA\u00111CO\u0005\u0003wQ\u0011qAT8uQ&tw\r\u0005\u0002\u0014{%\u0011a\b\u0006\u0002\u0004\u0003:L\b\u0002\u0003!\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0015\u0002\rE,X-^3!\u0011!\u0011\u0005A!b\u0001\n\u0003\u0019\u0015AC:fe&\fG.\u001b>feV\tA\tE\u0002F\u000fRj\u0011A\u0012\u0006\u0003\u0005\u001aI!\u0001\u0013$\u0003\u000f\u0015s7m\u001c3fe\"A!\n\u0001B\u0001B\u0003%A)A\u0006tKJL\u0017\r\\5{KJ\u0004\u0003\u0002\u0003'\u0001\u0005\u000b\u0007I\u0011A'\u0002%UtG-\u001a:ms&tw\r\u0015:pIV\u001cWM]\u000b\u0002\u001dB\u0011q\nU\u0007\u0002\t%\u0011\u0011\u000b\u0002\u0002\r'ft7\r\u0015:pIV\u001cWM\u001d\u0005\t'\u0002\u0011\t\u0011)A\u0005\u001d\u0006\u0019RO\u001c3fe2L\u0018N\\4Qe>$WoY3sA!AQ\u000b\u0001BC\u0002\u0013\u0005a+A\u0004iC:$G.\u001a:\u0016\u0003]\u00032!\r-5\u0013\tI&A\u0001\u0007Fm\u0016tG\u000fS1oI2,'\u000f\u0003\u0005\\\u0001\t\u0005\t\u0015!\u0003X\u0003!A\u0017M\u001c3mKJ\u0004\u0003\u0002C/\u0001\u0005\u000b\u0007I\u0011\u00010\u0002\u0015\r\u00147\u000eS1oI2,'/F\u0001`!\r\t\u0004\rN\u0005\u0003C\n\u0011qbQ1mY\n\f7m\u001b%b]\u0012dWM\u001d\u0005\tG\u0002\u0011\t\u0011)A\u0005?\u0006Y1MY6IC:$G.\u001a:!\u0011!)\u0007A!b\u0001\n\u00031\u0017!C9vKV,G+[7f+\u00059\u0007CA\ni\u0013\tIGC\u0001\u0003M_:<\u0007\u0002C6\u0001\u0005\u0003\u0005\u000b\u0011B4\u0002\u0015E,X-^3US6,\u0007\u0005\u0003\u0005n\u0001\t\u0015\r\u0011\"\u0001o\u0003%\u0011\u0017\r^2i'&TX-F\u0001p!\t\u0019\u0002/\u0003\u0002r)\t\u0019\u0011J\u001c;\t\u0011M\u0004!\u0011!Q\u0001\n=\f!BY1uG\"\u001c\u0016N_3!\u0011!)\bA!b\u0001\n\u00031\u0018aD:ikR$wn\u001e8D_6l\u0017M\u001c3\u0016\u0003qB\u0001\u0002\u001f\u0001\u0003\u0002\u0003\u0006I\u0001P\u0001\u0011g\",H\u000fZ8x]\u000e{W.\\1oI\u0002BQA\u001f\u0001\u0005\u0002m\fa\u0001P5oSRtD\u0003\u0005?~}~\f\t!a\u0001\u0002\u0006\u0005\u001d\u0011\u0011BA\u0006!\r\t\u0004\u0001\u000e\u0005\u00063e\u0004\r\u0001\b\u0005\u0006Me\u0004\r\u0001\u000b\u0005\u0006\u0005f\u0004\r\u0001\u0012\u0005\u0006\u0019f\u0004\rA\u0014\u0005\u0006+f\u0004\ra\u0016\u0005\u0006;f\u0004\ra\u0018\u0005\u0006Kf\u0004\ra\u001a\u0005\u0006[f\u0004\ra\u001c\u0005\u0006kf\u0004\r\u0001\u0010\u0005\n\u0003\u001f\u0001!\u0019!C\u0005\u0003#\ta\u0001\\8hO\u0016\u0014XCAA\n!\u0011\t)\"a\t\u000e\u0005\u0005]!\u0002BA\r\u00037\tQ\u0001\\8hi)TA!!\b\u0002 \u00051\u0011\r]1dQ\u0016T!!!\t\u0002\u0007=\u0014x-\u0003\u0003\u0002&\u0005]!A\u0002'pO\u001e,'\u000f\u0003\u0005\u0002*\u0001\u0001\u000b\u0011BA\n\u0003\u001dawnZ4fe\u0002B\u0011\"!\f\u0001\u0005\u0004%I!a\f\u0002\u001bMDW\u000f\u001e3po:d\u0015\r^2i+\t\t\t\u0004E\u0002*\u0003gI1!!\u000e+\u00059\u0019u.\u001e8u\t><h\u000eT1uG\"D\u0001\"!\u000f\u0001A\u0003%\u0011\u0011G\u0001\u000fg\",H\u000fZ8x]2\u000bGo\u00195!\u0011\u001d\ti\u0004\u0001C!\u0003\u007f\t1A];o)\t\t\t\u0005E\u0002\u0014\u0003\u0007J1!!\u0012\u0015\u0005\u0011)f.\u001b;\t\u000f\u0005%\u0003\u0001\"\u0001\u0002L\u0005i\u0011m^1jiNCW\u000f\u001e3po:,\"!!\u0011\t\u000f\u0005=\u0003\u0001\"\u0001\u0002L\u0005A1\u000f[;uI><h\u000eC\u0004\u0002T\u0001!I!!\u0016\u0002\u001bA\u0014xnY3tg\u00163XM\u001c;t)\t\t9\u0006E\u0003\u0002Z\u0005%\u0004G\u0004\u0003\u0002\\\u0005\u0015d\u0002BA/\u0003Gj!!a\u0018\u000b\u0007\u0005\u0005$$\u0001\u0004=e>|GOP\u0005\u0002+%\u0019\u0011q\r\u000b\u0002\u000fA\f7m[1hK&!\u00111NA7\u0005\r\u0019V-\u001d\u0006\u0004\u0003O\"\u0002bBA9\u0001\u0011\u0005\u00111O\u0001\fiJLHk\u001c%b]\u0012dW\r\u0006\u0003\u0002B\u0005U\u0004\u0002CA<\u0003_\u0002\r!a\u0016\u0002\r\u00154XM\u001c;t\u0011\u001d\tY\b\u0001C\u0005\u0003{\n\u0011\u0002\\8h\u000bZ,g\u000e^:\u0015\r\u0005\u0005\u0013qPAB\u0011\u001d\t\t)!\u001fA\u0002q\t1\u0001^1h\u0011!\t9(!\u001fA\u0002\u0005\u0015\u0005#BA-\u0003\u000f\u0003\u0014\u0002BAE\u0003[\u0012\u0001\"\u0013;fe\u0006\u0014G.\u001a")
public class ProducerSendThread<T>
extends Thread
implements ScalaObject {
    private final String threadName;
    private final BlockingQueue<QueueItem<T>> queue;
    private final Encoder<T> serializer;
    private final SyncProducer underlyingProducer;
    private final EventHandler<T> handler;
    private final CallbackHandler<T> cbkHandler;
    private final long queueTime;
    private final int batchSize;
    private final Object shutdownCommand;
    private final Logger kafka$producer$async$ProducerSendThread$$logger;
    private final CountDownLatch shutdownLatch;

    public String threadName() {
        return this.threadName;
    }

    public BlockingQueue<QueueItem<T>> queue() {
        return this.queue;
    }

    public Encoder<T> serializer() {
        return this.serializer;
    }

    public SyncProducer underlyingProducer() {
        return this.underlyingProducer;
    }

    public EventHandler<T> handler() {
        return this.handler;
    }

    public CallbackHandler<T> cbkHandler() {
        return this.cbkHandler;
    }

    public long queueTime() {
        return this.queueTime;
    }

    public int batchSize() {
        return this.batchSize;
    }

    public Object shutdownCommand() {
        return this.shutdownCommand;
    }

    public final Logger kafka$producer$async$ProducerSendThread$$logger() {
        return this.kafka$producer$async$ProducerSendThread$$logger;
    }

    private CountDownLatch shutdownLatch() {
        return this.shutdownLatch;
    }

    @Override
    public void run() {
        try {
            Seq<QueueItem<T>> remainingEvents = this.processEvents();
            if (this.kafka$producer$async$ProducerSendThread$$logger().isDebugEnabled()) {
                this.kafka$producer$async$ProducerSendThread$$logger().debug((Object)new StringBuilder().append((Object)"Remaining events = ").append((Object)BoxesRunTime.boxToInteger((int)remainingEvents.size())).toString());
            }
            if (remainingEvents.size() > 0) {
                if (this.kafka$producer$async$ProducerSendThread$$logger().isDebugEnabled()) {
                    this.kafka$producer$async$ProducerSendThread$$logger().debug((Object)Predef$.MODULE$.augmentString("Dispatching last batch of %d events to the event handler").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)remainingEvents.size())})));
                }
                this.tryToHandle(remainingEvents);
            }
        }
        catch (Exception exception) {
            this.kafka$producer$async$ProducerSendThread$$logger().error((Object)"Error in sending events: ", (Throwable)exception);
        }
        return;
        finally {
            this.shutdownLatch().countDown();
        }
    }

    public void awaitShutdown() {
        this.shutdownLatch().await();
    }

    public void shutdown() {
        this.handler().close();
        this.kafka$producer$async$ProducerSendThread$$logger().info((Object)"Shutdown thread complete");
    }

    private Seq<QueueItem<T>> processEvents() {
        LongRef lastSend$1 = new LongRef(SystemTime$.MODULE$.milliseconds());
        ObjectRef events$1 = new ObjectRef((Object)new ListBuffer());
        BooleanRef full$1 = new BooleanRef(false);
        package$.MODULE$.Stream().continually((Function0)new $anonfun$processEvents$1(this, lastSend$1)).takeWhile((Function1)new $anonfun$processEvents$2(this)).foreach((Function1)new $anonfun$processEvents$3(this, lastSend$1, events$1, full$1));
        if (this.queue().size() > 0) {
            throw new IllegalQueueStateException(Predef$.MODULE$.augmentString("Invalid queue state! After queue shutdown, %d remaining items in the queue").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.queue().size())})));
        }
        if (this.cbkHandler() != null) {
            this.kafka$producer$async$ProducerSendThread$$logger().info((Object)Predef$.MODULE$.augmentString("Invoking the callback handler before handling the last batch of %d events").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)((ListBuffer)events$1.elem).size())})));
            scala.collection.mutable.Seq<QueueItem<T>> addedEvents = this.cbkHandler().lastBatchBeforeClose();
            this.logEvents("last batch before close", (Iterable<QueueItem<T>>)addedEvents);
            events$1.elem = (ListBuffer)((ListBuffer)events$1.elem).$plus$plus(addedEvents);
        }
        return (ListBuffer)events$1.elem;
    }

    public void tryToHandle(Seq<QueueItem<T>> events) {
        try {
            if (this.kafka$producer$async$ProducerSendThread$$logger().isDebugEnabled()) {
                this.kafka$producer$async$ProducerSendThread$$logger().debug((Object)new StringBuilder().append((Object)"Handling ").append((Object)BoxesRunTime.boxToInteger((int)events.size())).append((Object)" events").toString());
            }
            if (events.size() > 0) {
                this.handler().handle(events, this.underlyingProducer(), this.serializer());
            }
        }
        catch (Exception exception) {
            this.kafka$producer$async$ProducerSendThread$$logger().error((Object)new StringBuilder().append((Object)"Error in handling batch of ").append((Object)BoxesRunTime.boxToInteger((int)events.size())).append((Object)" events").toString(), (Throwable)exception);
        }
    }

    private void logEvents(String tag, Iterable<QueueItem<T>> events) {
        if (this.kafka$producer$async$ProducerSendThread$$logger().isTraceEnabled()) {
            this.kafka$producer$async$ProducerSendThread$$logger().trace((Object)new StringBuilder().append((Object)"events for ").append((Object)tag).append((Object)":").toString());
            events.foreach((Function1)new $anonfun$logEvents$1(this));
        }
    }

    public ProducerSendThread(String threadName, BlockingQueue<QueueItem<T>> queue, Encoder<T> serializer, SyncProducer underlyingProducer, EventHandler<T> handler, CallbackHandler<T> cbkHandler, long queueTime, int batchSize, Object shutdownCommand) {
        this.threadName = threadName;
        this.queue = queue;
        this.serializer = serializer;
        this.underlyingProducer = underlyingProducer;
        this.handler = handler;
        this.cbkHandler = cbkHandler;
        this.queueTime = queueTime;
        this.batchSize = batchSize;
        this.shutdownCommand = shutdownCommand;
        super(threadName);
        this.kafka$producer$async$ProducerSendThread$$logger = Logger.getLogger(ProducerSendThread.class);
        this.shutdownLatch = new CountDownLatch(1);
    }
}

