package kafka.tools;

import java.util.concurrent.atomic.AtomicLong;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerConnector;
import kafka.consumer.KafkaMessageStream;
import kafka.utils.Utils$;
import org.apache.log4j.Logger;
import scala.Predef$;
import scala.ScalaObject;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

/* compiled from: ConsumerPerformance.scala */
/* loaded from: input_file:kafka/tools/ConsumerPerformance$.class */
public final class ConsumerPerformance$ implements ScalaObject {
    public static final ConsumerPerformance$ MODULE$ = null;
    private final Logger kafka$tools$ConsumerPerformance$$logger;

    static {
        new ConsumerPerformance$();
    }

    public final Logger kafka$tools$ConsumerPerformance$$logger() {
        return this.kafka$tools$ConsumerPerformance$$logger;
    }

    public void main(String[] strArr) {
        OptionParser optionParser = new OptionParser();
        ArgumentAcceptingOptionSpec ofType = optionParser.accepts("topic", "REQUIRED: The topic to consume from.").withRequiredArg().describedAs("topic").ofType(String.class);
        ArgumentAcceptingOptionSpec ofType2 = optionParser.accepts("props", "REQUIRED: Properties file with the consumer properties.").withRequiredArg().describedAs("properties").ofType(String.class);
        ArgumentAcceptingOptionSpec defaultsTo = optionParser.accepts("threads", "Number of processing threads.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(10), new Integer[0]);
        ArgumentAcceptingOptionSpec defaultsTo2 = optionParser.accepts("reporting-interval", "Interval at which to print progress info.").withRequiredArg().describedAs("size").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(100000), new Integer[0]);
        ArgumentAcceptingOptionSpec defaultsTo3 = optionParser.accepts("sleep", "Initial interval to wait before connecting.").withRequiredArg().describedAs("secs").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(5), new Integer[0]);
        OptionSet parse = optionParser.parse(strArr);
        List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new ArgumentAcceptingOptionSpec[]{ofType, ofType2})).foreach(new ConsumerPerformance$$anonfun$main$1(optionParser, parse));
        int intValue = ((Integer) parse.valueOf(defaultsTo)).intValue();
        ((Integer) parse.valueOf(defaultsTo2)).intValue();
        String str = (String) parse.valueOf(ofType2);
        String str2 = (String) parse.valueOf(ofType);
        int intValue2 = ((Integer) parse.valueOf(defaultsTo2)).intValue();
        int intValue3 = ((Integer) parse.valueOf(defaultsTo3)).intValue() * 1000;
        Predef$.MODULE$.println("Starting consumer...");
        final ObjectRef objectRef = new ObjectRef(new AtomicLong(0L));
        final ObjectRef objectRef2 = new ObjectRef(new AtomicLong(0L));
        final ConsumerConnector create = Consumer$.MODULE$.create(new ConsumerConfig(Utils$.MODULE$.loadProps(str)));
        Map<String, List<KafkaMessageStream>> createMessageStreams = create.createMessageStreams(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$.MODULE$.any2ArrowAssoc(str2).$minus$greater(BoxesRunTime.boxToInteger(intValue))})));
        final ObjectRef objectRef3 = new ObjectRef(Nil$.MODULE$);
        createMessageStreams.foreach(new ConsumerPerformance$$anonfun$main$2(intValue2, objectRef, objectRef2, objectRef3));
        kafka$tools$ConsumerPerformance$$logger().info(new StringBuilder().append("Sleeping for ").append(BoxesRunTime.boxToInteger(intValue3 / 1000)).append(" seconds.").toString());
        Thread.sleep(intValue3);
        kafka$tools$ConsumerPerformance$$logger().info("starting threads");
        ((List) objectRef3.elem).foreach(new ConsumerPerformance$$anonfun$main$3());
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: kafka.tools.ConsumerPerformance$$anon$2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                ((List) objectRef3.elem).foreach(new ConsumerPerformance$$anon$2$$anonfun$run$2(this));
                try {
                    create.shutdown();
                } catch (Throwable unused) {
                }
                Predef$.MODULE$.println(new StringBuilder().append("total nMsgs: ").append((AtomicLong) objectRef.elem).toString());
                Predef$.MODULE$.println(new StringBuilder().append("totalBytesRead ").append((AtomicLong) objectRef2.elem).toString());
            }
        });
    }

    private ConsumerPerformance$() {
        MODULE$ = this;
        this.kafka$tools$ConsumerPerformance$$logger = Logger.getLogger(getClass());
    }
}
