/*
 * Decompiled with CFR 0.152.
 */
package kafka.consumer;

import java.util.Properties;
import java.util.Random;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.consumer.ConsoleConsumer;
import kafka.consumer.ConsoleConsumer$;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerConnector;
import kafka.consumer.KafkaMessageStream;
import kafka.utils.Utils$;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.log4j.Logger;
import scala.Function1;
import scala.Predef$;
import scala.ScalaObject;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.JavaConversions$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.mutable.Iterable$;
import scala.collection.mutable.Map$;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxesRunTime;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public final class ConsoleConsumer$
implements ScalaObject {
    public static final ConsoleConsumer$ MODULE$;
    private final Logger kafka$consumer$ConsoleConsumer$$logger;

    static {
        new ConsoleConsumer$();
    }

    public final Logger kafka$consumer$ConsoleConsumer$$logger() {
        return this.kafka$consumer$ConsoleConsumer$$logger;
    }

    public void main(String[] args) {
        OptionParser parser = new OptionParser();
        ArgumentAcceptingOptionSpec topicIdOpt = parser.accepts("topic", "REQUIRED: The topic id to consume on.").withRequiredArg().describedAs("topic").ofType(String.class);
        ArgumentAcceptingOptionSpec zkConnectOpt$1 = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.").withRequiredArg().describedAs("urls").ofType(String.class);
        ArgumentAcceptingOptionSpec groupIdOpt$1 = parser.accepts("group", "The group id to consume on.").withRequiredArg().describedAs("gid").defaultsTo((Object)new StringBuilder().append((Object)"console-consumer-").append((Object)BoxesRunTime.boxToInteger((int)new Random().nextInt(100000))).toString(), (Object[])new String[0]).ofType(String.class);
        ArgumentAcceptingOptionSpec fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.").withRequiredArg().describedAs("size").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(0x100000), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.").withRequiredArg().describedAs("size").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(0x200000), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.").withRequiredArg().describedAs("class").ofType(String.class).defaultsTo((Object)ConsoleConsumer.NewlineMessageFormatter.class.getName(), (Object[])new String[0]);
        ArgumentAcceptingOptionSpec messageFormatterArgOpt = parser.accepts("property").withRequiredArg().describedAs("prop").ofType(String.class);
        OptionSpecBuilder resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.");
        ArgumentAcceptingOptionSpec autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval at which to save the current offset in ms").withRequiredArg().describedAs("ms").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(10000), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.").withRequiredArg().describedAs("num_messages").ofType(Integer.class);
        OptionSpecBuilder skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, skip it instead of halt.");
        OptionSet options$1 = this.tryParse(parser, args);
        this.checkRequiredArgs(parser, options$1, (Seq<OptionSpec<?>>)Predef$.MODULE$.wrapRefArray((Object[])new OptionSpec[]{topicIdOpt, zkConnectOpt$1}));
        Properties props = new Properties();
        props.put("groupid", options$1.valueOf((OptionSpec)groupIdOpt$1));
        props.put("socket.buffer.size", ((Integer)options$1.valueOf((OptionSpec)socketBufferSizeOpt)).toString());
        props.put("fetch.size", ((Integer)options$1.valueOf((OptionSpec)fetchSizeOpt)).toString());
        props.put("auto.commit", "true");
        props.put("autocommit.interval.ms", ((Integer)options$1.valueOf((OptionSpec)autoCommitIntervalOpt)).toString());
        props.put("autooffset.reset", options$1.has((OptionSpec)resetBeginningOpt) ? "smallest" : "largest");
        props.put("zk.connect", options$1.valueOf((OptionSpec)zkConnectOpt$1));
        ConsumerConfig config = new ConsumerConfig(props);
        boolean skipMessageOnError$1 = options$1.has((OptionSpec)skipMessageOnErrorOpt);
        String topic = (String)options$1.valueOf((OptionSpec)topicIdOpt);
        Class<?> messageFormatterClass = Class.forName((String)options$1.valueOf((OptionSpec)messageFormatterOpt));
        Properties formatterArgs = this.tryParseFormatterArgs((scala.collection.mutable.Iterable<String>)JavaConversions$.MODULE$.asBuffer(options$1.valuesOf((OptionSpec)messageFormatterArgOpt)));
        int maxMessages = options$1.has((OptionSpec)maxMessagesOpt) ? (Integer)options$1.valueOf((OptionSpec)maxMessagesOpt) : -1;
        ConsumerConnector connector$1 = Consumer$.MODULE$.create(config);
        Runtime.getRuntime().addShutdownHook(new anon.1(zkConnectOpt$1, groupIdOpt$1, options$1, connector$1));
        KafkaMessageStream stream = (KafkaMessageStream)JavaConversions$.MODULE$.asList((Seq)connector$1.createMessageStreams((Map<String, Integer>)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef$.MODULE$.any2ArrowAssoc((Object)topic).$minus$greater((Object)BoxesRunTime.boxToInteger((int)1))})), connector$1.createMessageStreams$default$2()).get((Object)topic).get()).get(0);
        KafkaMessageStream iter = maxMessages >= 0 ? (Iterable)stream.slice(0, maxMessages) : stream;
        ConsoleConsumer.MessageFormatter formatter$1 = (ConsoleConsumer.MessageFormatter)messageFormatterClass.newInstance();
        formatter$1.init(formatterArgs);
        try {
            iter.foreach((Function1)new anonfun.main.1(skipMessageOnError$1, connector$1, formatter$1));
        }
        catch (Throwable throwable) {
            this.kafka$consumer$ConsoleConsumer$$logger().error((Object)"Error processing message, stopping consumer: ", throwable);
        }
        System.out.flush();
        formatter$1.close();
        connector$1.shutdown();
    }

    /*
     * WARNING - void declaration
     */
    public OptionSet tryParse(OptionParser parser, String[] args) {
        void var3_3;
        OptionSet exceptionResult1 = null;
        try {
            exceptionResult1 = parser.parse(args);
        }
        catch (OptionException optionException) {
            Utils$.MODULE$.croak(optionException.getMessage());
            exceptionResult1 = null;
        }
        return var3_3;
    }

    public void checkRequiredArgs(OptionParser parser$1, OptionSet options$2, Seq<OptionSpec<?>> required) {
        required.foreach((Function1)new anonfun.checkRequiredArgs.1(parser$1, options$2));
    }

    /*
     * WARNING - void declaration
     */
    public Properties tryParseFormatterArgs(scala.collection.mutable.Iterable<String> args) {
        void var3_3;
        scala.collection.mutable.Iterable splits = (scala.collection.mutable.Iterable)((TraversableLike)((TraversableLike)args.map((Function1)new anonfun.1(), Iterable$.MODULE$.canBuildFrom())).filterNot((Function1)new anonfun.2())).filterNot((Function1)new anonfun.3());
        if (!splits.forall((Function1)new anonfun.tryParseFormatterArgs.1())) {
            System.err.println(new StringBuilder().append((Object)"Invalid parser arguments: ").append((Object)args.mkString(" ")).toString());
            System.exit(1);
        }
        Properties props$1 = new Properties();
        splits.foreach((Function1)new anonfun.tryParseFormatterArgs.2(props$1));
        return var3_3;
    }

    public void tryCleanupZookeeper(String zkUrl, String groupId) {
        try {
            String dir = new StringBuilder().append((Object)"/consumers/").append((Object)groupId).toString();
            this.kafka$consumer$ConsoleConsumer$$logger().info((Object)new StringBuilder().append((Object)"Cleaning up temporary zookeeper data under ").append((Object)dir).append((Object)".").toString());
            ZkClient zk = new ZkClient(zkUrl, 30000, 30000, (ZkSerializer)ZKStringSerializer$.MODULE$);
            zk.deleteRecursive(dir);
            zk.close();
        }
        catch (Throwable throwable) {}
    }

    private ConsoleConsumer$() {
        MODULE$ = this;
        this.kafka$consumer$ConsoleConsumer$$logger = Logger.getLogger(this.getClass());
    }
}

