/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.kafka;

import com.hazelcast.jet.IMapJet;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.config.InstanceConfig;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sources;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import scala.Option;

public class KafkaSink {
    private static final int MESSAGE_COUNT = 50000;
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String AUTO_OFFSET_RESET = "earliest";
    private static final String SOURCE_NAME = "source";
    private static final String SINK_TOPIC_NAME = "t1";
    private EmbeddedZookeeper zkServer;
    private ZkUtils zkUtils;
    private KafkaServer kafkaServer;
    private KafkaConsumer kafkaConsumer;

    private static Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.map((String)SOURCE_NAME)).drainTo(KafkaSinks.kafka((Properties)KafkaSink.props("bootstrap.servers", BOOTSTRAP_SERVERS, "key.serializer", StringSerializer.class.getCanonicalName(), "value.serializer", IntegerSerializer.class.getCanonicalName()), (String)SINK_TOPIC_NAME));
        return p;
    }

    public static void main(String[] args) throws Exception {
        System.setProperty("hazelcast.logging.type", "log4j");
        new KafkaSink().run();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void run() throws Exception {
        JetConfig cfg = new JetConfig();
        cfg.setInstanceConfig(new InstanceConfig().setCooperativeThreadCount(Math.max(1, Runtime.getRuntime().availableProcessors() / 2)));
        try {
            this.createKafkaCluster();
            JetInstance instance = Jet.newJetInstance((JetConfig)cfg);
            Jet.newJetInstance((JetConfig)cfg);
            IMapJet sourceMap = instance.getMap(SOURCE_NAME);
            this.fillIMap((IMapJet<String, Integer>)sourceMap);
            Pipeline p = KafkaSink.buildPipeline();
            long start = System.nanoTime();
            Job job = instance.newJob(p);
            System.out.println("Consuming Topics");
            this.kafkaConsumer = TestUtils.createConsumer((String)BOOTSTRAP_SERVERS, (String)"verification-consumer", (String)AUTO_OFFSET_RESET, (boolean)true, (boolean)true, (int)4096, (SecurityProtocol)SecurityProtocol.PLAINTEXT, (Option)Option.empty(), (Option)Option.empty(), (Deserializer)new StringDeserializer(), (Deserializer)new IntegerDeserializer());
            this.kafkaConsumer.subscribe(Collections.singleton(SINK_TOPIC_NAME));
            int totalMessagesSeen = 0;
            while (true) {
                ConsumerRecords records = this.kafkaConsumer.poll(10000L);
                System.out.format("Received %d entries in %d milliseconds.%n", totalMessagesSeen += records.count(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
                if (totalMessagesSeen == 50000) {
                    job.cancel();
                    break;
                }
                Thread.sleep(100L);
            }
        }
        finally {
            Jet.shutdownAll();
            this.shutdownKafkaCluster();
        }
    }

    private void createKafkaCluster() throws IOException {
        this.zkServer = new EmbeddedZookeeper();
        String zkConnect = "localhost:" + this.zkServer.port();
        ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, (ZkSerializer)ZKStringSerializer$.MODULE$);
        this.zkUtils = ZkUtils.apply((ZkClient)zkClient, (boolean)false);
        KafkaConfig config = new KafkaConfig((Map)KafkaSink.props("zookeeper.connect", zkConnect, "broker.id", "0", "log.dirs", Files.createTempDirectory("kafka-", new FileAttribute[0]).toAbsolutePath().toString(), "offsets.topic.replication.factor", "1", "listeners", "PLAINTEXT://localhost:9092"));
        MockTime mock = new MockTime();
        this.kafkaServer = TestUtils.createServer((KafkaConfig)config, (Time)mock);
    }

    private void fillIMap(IMapJet<String, Integer> sourceMap) {
        System.out.println("Filling IMap");
        for (int i = 1; i <= 50000; ++i) {
            sourceMap.put((Object)("t1-" + i), (Object)i);
        }
        System.out.println("Published 50000 messages to IMap -> source");
    }

    private void shutdownKafkaCluster() {
        this.kafkaServer.shutdown();
        this.kafkaConsumer.close();
        this.zkUtils.close();
        this.zkServer.shutdown();
    }

    private static Properties props(String ... kvs) {
        Properties props = new Properties();
        int i = 0;
        while (i < kvs.length) {
            props.setProperty(kvs[i++], kvs[i++]);
        }
        return props;
    }
}

