package io.divolte.kafka.consumer;

import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/divolte/kafka/consumer/DivolteKafkaConsumer.class */
public final class DivolteKafkaConsumer<T extends SpecificRecord> {
    private static final Logger logger = LoggerFactory.getLogger(DivolteKafkaConsumer.class);
    public static final long DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 400;
    public static final long DEFAULT_ZOOKEEPER_SYNC_TIMEOUT = 200;
    public static final long DEFAULT_AUTO_COMMIT_INTERVAL = 1000;
    private final int numThreads;
    private final ConsumerConnector consumer;
    private final ExecutorService executorService;
    private final String topic;
    private final Supplier<EventHandler<T>> handlerSupplier;
    private final Schema schema;

    /* loaded from: input_file:io/divolte/kafka/consumer/DivolteKafkaConsumer$EventHandler.class */
    public interface EventHandler<T> {
        void handle(T t) throws Exception;

        void setup() throws Exception;

        void shutdown() throws Exception;
    }

    /* loaded from: input_file:io/divolte/kafka/consumer/DivolteKafkaConsumer$SimpleEventHandler.class */
    public interface SimpleEventHandler<T> {
        void handle(T t) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/divolte/kafka/consumer/DivolteKafkaConsumer$SimpleEventHandlerWrapper.class */
    public static final class SimpleEventHandlerWrapper<T> implements EventHandler<T> {
        private final SimpleEventHandler<T> wrapped;

        SimpleEventHandlerWrapper(SimpleEventHandler<T> simpleEventHandler) {
            this.wrapped = simpleEventHandler;
        }

        @Override // io.divolte.kafka.consumer.DivolteKafkaConsumer.EventHandler
        public void handle(T t) throws Exception {
            this.wrapped.handle(t);
        }

        @Override // io.divolte.kafka.consumer.DivolteKafkaConsumer.EventHandler
        public void setup() throws Exception {
        }

        @Override // io.divolte.kafka.consumer.DivolteKafkaConsumer.EventHandler
        public void shutdown() throws Exception {
        }
    }

    public static <T extends SpecificRecord> DivolteKafkaConsumer<T> createConsumer(String str, String str2, String str3, int i, Supplier<EventHandler<T>> supplier, Schema schema, long j, long j2, long j3) {
        return new DivolteKafkaConsumer<>(str, str2, str3, i, supplier, schema, j, j2, j3);
    }

    public static <T extends SpecificRecord> DivolteKafkaConsumer<T> createConsumer(String str, String str2, String str3, int i, Supplier<EventHandler<T>> supplier, Schema schema) {
        return new DivolteKafkaConsumer<>(str, str2, str3, i, supplier, schema, 400L, 200L, 1000L);
    }

    public static <T extends SpecificRecord> DivolteKafkaConsumer<T> createConsumerWithSimpleHandler(String str, String str2, String str3, int i, final Supplier<SimpleEventHandler<T>> supplier, Schema schema) {
        return createConsumer(str, str2, str3, i, new Supplier<EventHandler<T>>() { // from class: io.divolte.kafka.consumer.DivolteKafkaConsumer.1
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public EventHandler<T> m1get() {
                return new SimpleEventHandlerWrapper((SimpleEventHandler) supplier.get());
            }
        }, schema);
    }

    public static <T extends SpecificRecord> DivolteKafkaConsumer<T> createConsumerWithSimpleHandler(String str, String str2, String str3, int i, final Supplier<SimpleEventHandler<T>> supplier, Schema schema, long j, long j2, long j3) {
        return createConsumer(str, str2, str3, i, new Supplier<EventHandler<T>>() { // from class: io.divolte.kafka.consumer.DivolteKafkaConsumer.2
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public EventHandler<T> m2get() {
                return new SimpleEventHandlerWrapper((SimpleEventHandler) supplier.get());
            }
        }, schema, j, j2, j3);
    }

    public void startConsumer() {
        Iterator it = ((List) this.consumer.createMessageStreams(ImmutableMap.of(Objects.requireNonNull(this.topic), Integer.valueOf(this.numThreads))).get(this.topic)).iterator();
        while (it.hasNext()) {
            scheduleReader((KafkaStream) it.next(), DecoderFactory.get().binaryDecoder(new byte[0], (BinaryDecoder) null), new SpecificDatumReader<>(this.schema));
        }
    }

    public void shutdownConsumer() {
        this.consumer.shutdown();
        this.executorService.shutdown();
    }

    private DivolteKafkaConsumer(String str, String str2, String str3, int i, Supplier<EventHandler<T>> supplier, Schema schema, long j, long j2, long j3) {
        this.topic = (String) Objects.requireNonNull(str);
        this.schema = (Schema) Objects.requireNonNull(schema);
        this.handlerSupplier = (Supplier) Objects.requireNonNull(supplier);
        this.consumer = Consumer.createJavaConsumerConnector(config((String) Objects.requireNonNull(str2), (String) Objects.requireNonNull(str3), j, j2, j3));
        this.numThreads = i;
        this.executorService = Executors.newFixedThreadPool(i, createThreadFactory(new ThreadGroup("Consumer threads [" + str3 + "]"), "Consumer [" + str3 + "] - %d"));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleReader(final KafkaStream<byte[], byte[]> kafkaStream, final BinaryDecoder binaryDecoder, final SpecificDatumReader<T> specificDatumReader) {
        this.executorService.submit(new Runnable() { // from class: io.divolte.kafka.consumer.DivolteKafkaConsumer.3
            @Override // java.lang.Runnable
            public void run() {
                EventHandler eventHandler = (EventHandler) DivolteKafkaConsumer.this.handlerSupplier.get();
                try {
                    eventHandler.setup();
                    ConsumerIterator it = kafkaStream.iterator();
                    while (it.hasNext()) {
                        DecoderFactory.get().binaryDecoder((byte[]) it.next().message(), binaryDecoder);
                        eventHandler.handle(specificDatumReader.read((Object) null, binaryDecoder));
                    }
                } catch (Exception e) {
                    DivolteKafkaConsumer.logger.warn("Exception in event handler. Re-scheduling.", e);
                    DivolteKafkaConsumer.this.scheduleReader(kafkaStream, binaryDecoder, specificDatumReader);
                }
                try {
                    eventHandler.shutdown();
                } catch (Exception e2) {
                    DivolteKafkaConsumer.logger.warn("Exception in event handler shutdown.", e2);
                }
            }
        });
    }

    private static ConsumerConfig config(String str, String str2, long j, long j2, long j3) {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", str);
        properties.put("group.id", str2);
        properties.put("zookeeper.session.timeout.ms", Long.toString(j));
        properties.put("zookeeper.sync.time.ms", Long.toString(j2));
        properties.put("auto.commit.interval.ms", Long.toString(j3));
        return new ConsumerConfig(properties);
    }

    private static ThreadFactory createThreadFactory(final ThreadGroup threadGroup, String str) {
        return new ThreadFactoryBuilder().setNameFormat(str).setThreadFactory(new ThreadFactory() { // from class: io.divolte.kafka.consumer.DivolteKafkaConsumer.4
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(threadGroup, runnable);
            }
        }).build();
    }
}
