/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write;
import org.apache.beam.sdk.io.kafka.ConsumerSpEL;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaRecordCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.Function;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.Joiner;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.ComparisonChain;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.Iterators;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.Lists;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.io.Closeables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class KafkaIO {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);

    public static Read<byte[], byte[]> readBytes() {
        return new AutoValue_KafkaIO_Read.Builder().setTopics(new ArrayList<String>()).setTopicPartitions(new ArrayList<TopicPartition>()).setKeyDeserializer(ByteArrayDeserializer.class).setValueDeserializer(ByteArrayDeserializer.class).setConsumerFactoryFn((SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>)Read.KAFKA_CONSUMER_FACTORY_FN).setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES).setMaxNumRecords(Long.MAX_VALUE).build();
    }

    public static <K, V> Read<K, V> read() {
        return new AutoValue_KafkaIO_Read.Builder().setTopics(new ArrayList<String>()).setTopicPartitions(new ArrayList<TopicPartition>()).setConsumerFactoryFn((SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>)Read.KAFKA_CONSUMER_FACTORY_FN).setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES).setMaxNumRecords(Long.MAX_VALUE).build();
    }

    public static <K, V> Write<K, V> write() {
        return new AutoValue_KafkaIO_Write.Builder().setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES).build();
    }

    private static Map<String, Object> updateKafkaProperties(Map<String, Object> currentConfig, Map<String, String> ignoredProperties, Map<String, Object> updates) {
        for (String key : updates.keySet()) {
            Preconditions.checkArgument(!ignoredProperties.containsKey(key), "No need to configure '%s'. %s", (Object)key, (Object)ignoredProperties.get(key));
        }
        HashMap<String, Object> config = new HashMap<String, Object>(currentConfig);
        config.putAll(updates);
        return config;
    }

    private KafkaIO() {
    }

    @VisibleForTesting
    static <T> NullableCoder<T> inferCoder(CoderRegistry coderRegistry, Class<? extends Deserializer<T>> deserializer) {
        Preconditions.checkNotNull(deserializer);
        for (Type type : deserializer.getGenericInterfaces()) {
            ParameterizedType parameterizedType;
            if (!(type instanceof ParameterizedType) || (parameterizedType = (ParameterizedType)type).getRawType() != Deserializer.class) continue;
            Type parameter = parameterizedType.getActualTypeArguments()[0];
            Class clazz = (Class)parameter;
            try {
                return NullableCoder.of((Coder)coderRegistry.getCoder(clazz));
            }
            catch (CannotProvideCoderException e) {
                throw new RuntimeException(String.format("Unable to automatically infer a Coder for the Kafka Deserializer %s: no coder registered for type %s", deserializer, clazz));
            }
        }
        throw new RuntimeException(String.format("Could not extract the Kafka Deserializer type from %s", deserializer));
    }

    private static class KafkaWriter<K, V>
    extends DoFn<KV<K, V>, Void> {
        private final Write<K, V> spec;
        private final Map<String, Object> producerConfig;
        private transient Producer<K, V> producer = null;
        private transient Exception sendException = null;
        private transient long numSendFailures = 0L;
        private final Counter elementsWritten = SinkMetrics.elementsWritten();

        @DoFn.Setup
        public void setup() {
            this.producer = this.spec.getProducerFactoryFn() != null ? (Producer)this.spec.getProducerFactoryFn().apply(this.producerConfig) : new KafkaProducer(this.producerConfig);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext ctx) throws Exception {
            this.checkForFailures();
            KV kv = (KV)ctx.element();
            this.producer.send(new ProducerRecord(this.spec.getTopic(), kv.getKey(), kv.getValue()), (Callback)new SendCallback());
            this.elementsWritten.inc();
        }

        @DoFn.FinishBundle
        public void finishBundle() throws IOException {
            this.producer.flush();
            this.checkForFailures();
        }

        @DoFn.Teardown
        public void teardown() {
            this.producer.close();
        }

        KafkaWriter(Write<K, V> spec) {
            this.spec = spec;
            this.producerConfig = new HashMap<String, Object>(spec.getProducerConfig());
            this.producerConfig.put("key.serializer", spec.getKeySerializer());
            this.producerConfig.put("value.serializer", spec.getValueSerializer());
        }

        private synchronized void checkForFailures() throws IOException {
            if (this.numSendFailures == 0L) {
                return;
            }
            String msg = String.format("KafkaWriter : failed to send %d records (since last report)", this.numSendFailures);
            Exception e = this.sendException;
            this.sendException = null;
            this.numSendFailures = 0L;
            LOG.warn(msg);
            throw new IOException(msg, e);
        }

        private class SendCallback
        implements Callback {
            private SendCallback() {
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    return;
                }
                KafkaWriter kafkaWriter = KafkaWriter.this;
                synchronized (kafkaWriter) {
                    if (KafkaWriter.this.sendException == null) {
                        KafkaWriter.this.sendException = exception;
                    }
                    KafkaWriter.this.numSendFailures++;
                }
                LOG.warn("KafkaWriter send failed : '{}'", (Object)exception.getMessage());
            }
        }
    }

    private static class NullOnlyCoder<T>
    extends AtomicCoder<T> {
        private NullOnlyCoder() {
        }

        public void encode(T value, OutputStream outStream) {
            Preconditions.checkArgument(value == null, "Can only encode nulls");
        }

        public T decode(InputStream inStream) {
            return null;
        }
    }

    private static class KafkaValueWrite<K, V>
    extends PTransform<PCollection<V>, PDone> {
        private final Write<K, V> kvWriteTransform;

        private KafkaValueWrite(Write<K, V> kvWriteTransform) {
            this.kvWriteTransform = kvWriteTransform;
        }

        public PDone expand(PCollection<V> input) {
            return (PDone)((PCollection)input.apply("Kafka values with default key", (PTransform)MapElements.via((SimpleFunction)new SimpleFunction<V, KV<K, V>>(){

                public KV<K, V> apply(V element) {
                    return KV.of(null, element);
                }
            }))).setCoder((Coder)KvCoder.of(new NullOnlyCoder(), (Coder)input.getCoder())).apply(this.kvWriteTransform);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.kvWriteTransform.populateDisplayData(builder);
        }
    }

    public static abstract class Write<K, V>
    extends PTransform<PCollection<KV<K, V>>, PDone> {
        private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES = ImmutableMap.of("retries", 3);
        private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of("key.serializer", "Use withKeySerializer instead", "value.serializer", "Use withValueSerializer instead");

        @Nullable
        abstract String getTopic();

        abstract Map<String, Object> getProducerConfig();

        @Nullable
        abstract SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn();

        @Nullable
        abstract Class<? extends Serializer<K>> getKeySerializer();

        @Nullable
        abstract Class<? extends Serializer<V>> getValueSerializer();

        abstract Builder<K, V> toBuilder();

        public Write<K, V> withBootstrapServers(String bootstrapServers) {
            return this.updateProducerProperties(ImmutableMap.of("bootstrap.servers", bootstrapServers));
        }

        public Write<K, V> withTopic(String topic) {
            return this.toBuilder().setTopic(topic).build();
        }

        public Write<K, V> withKeySerializer(Class<? extends Serializer<K>> keySerializer) {
            return this.toBuilder().setKeySerializer(keySerializer).build();
        }

        public Write<K, V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) {
            return this.toBuilder().setValueSerializer(valueSerializer).build();
        }

        public Write<K, V> updateProducerProperties(Map<String, Object> configUpdates) {
            Map config = KafkaIO.updateKafkaProperties(this.getProducerConfig(), Write.IGNORED_PRODUCER_PROPERTIES, configUpdates);
            return this.toBuilder().setProducerConfig(config).build();
        }

        public Write<K, V> withProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) {
            return this.toBuilder().setProducerFactoryFn(producerFactoryFn).build();
        }

        public PTransform<PCollection<V>, PDone> values() {
            return new KafkaValueWrite(this.toBuilder().build());
        }

        public PDone expand(PCollection<KV<K, V>> input) {
            input.apply((PTransform)ParDo.of(new KafkaWriter(this)));
            return PDone.in((Pipeline)input.getPipeline());
        }

        public void validate(PipelineOptions options) {
            Preconditions.checkNotNull(this.getProducerConfig().get("bootstrap.servers"), "Kafka bootstrap servers should be set");
            Preconditions.checkNotNull(this.getTopic(), "Kafka topic should be set");
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item((String)"topic", (String)this.getTopic()).withLabel("Topic"));
            Set<String> ignoredProducerPropertiesKeys = IGNORED_PRODUCER_PROPERTIES.keySet();
            for (Map.Entry<String, Object> conf : this.getProducerConfig().entrySet()) {
                String key = conf.getKey();
                if (ignoredProducerPropertiesKeys.contains(key)) continue;
                Object value = DisplayData.inferType((Object)conf.getValue()) != null ? conf.getValue() : String.valueOf(conf.getValue());
                builder.add(DisplayData.item((String)key, (ValueProvider)ValueProvider.StaticValueProvider.of((Object)value)));
            }
        }

        static abstract class Builder<K, V> {
            Builder() {
            }

            abstract Builder<K, V> setTopic(String var1);

            abstract Builder<K, V> setProducerConfig(Map<String, Object> var1);

            abstract Builder<K, V> setProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> var1);

            abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>> var1);

            abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> var1);

            abstract Write<K, V> build();
        }
    }

    private static class UnboundedKafkaReader<K, V>
    extends UnboundedSource.UnboundedReader<KafkaRecord<K, V>> {
        private final UnboundedKafkaSource<K, V> source;
        private final String name;
        private Consumer<byte[], byte[]> consumer;
        private final List<PartitionState> partitionStates;
        private KafkaRecord<K, V> curRecord;
        private Instant curTimestamp;
        private Iterator<PartitionState> curBatch = Collections.emptyIterator();
        private Deserializer<K> keyDeserializerInstance = null;
        private Deserializer<V> valueDeserializerInstance = null;
        private final Counter elementsRead = SourceMetrics.elementsRead();
        private final Counter bytesRead = SourceMetrics.bytesRead();
        private final Counter elementsReadBySplit;
        private final Counter bytesReadBySplit;
        private final Gauge backlogBytesOfSplit;
        private final Gauge backlogElementsOfSplit;
        private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis((long)1000L);
        private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis((long)10L);
        private final ExecutorService consumerPollThread = Executors.newSingleThreadExecutor();
        private final SynchronousQueue<ConsumerRecords<byte[], byte[]>> availableRecordsQueue = new SynchronousQueue();
        private AtomicBoolean closed = new AtomicBoolean(false);
        private Consumer<byte[], byte[]> offsetConsumer;
        private final ScheduledExecutorService offsetFetcherThread = Executors.newSingleThreadScheduledExecutor();
        private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 5;
        private static final long UNINITIALIZED_OFFSET = -1L;
        private transient ConsumerSpEL consumerSpEL = new ConsumerSpEL();
        private static Instant initialWatermark = new Instant(Long.MIN_VALUE);

        public String toString() {
            return this.name;
        }

        public UnboundedKafkaReader(UnboundedKafkaSource<K, V> source, @Nullable KafkaCheckpointMark checkpointMark) {
            this.source = source;
            this.name = "Reader-" + ((UnboundedKafkaSource)source).id;
            List<TopicPartition> partitions = ((UnboundedKafkaSource)source).spec.getTopicPartitions();
            this.partitionStates = ImmutableList.copyOf(Lists.transform(partitions, new Function<TopicPartition, PartitionState>(){

                @Override
                public PartitionState apply(TopicPartition tp) {
                    return new PartitionState(tp, -1L);
                }
            }));
            if (checkpointMark != null) {
                Preconditions.checkState(checkpointMark.getPartitions().size() == partitions.size(), "checkPointMark and assignedPartitions should match");
                for (int i = 0; i < partitions.size(); ++i) {
                    KafkaCheckpointMark.PartitionMark ckptMark = checkpointMark.getPartitions().get(i);
                    TopicPartition assigned = partitions.get(i);
                    TopicPartition partition = new TopicPartition(ckptMark.getTopic(), ckptMark.getPartition());
                    Preconditions.checkState(partition.equals((Object)assigned), "checkpointed partition %s and assigned partition %s don't match", (Object)partition, (Object)assigned);
                    this.partitionStates.get(i).nextOffset = ckptMark.getNextOffset();
                }
            }
            String splitId = String.valueOf(((UnboundedKafkaSource)source).id);
            this.elementsReadBySplit = SourceMetrics.elementsReadBySplit((String)splitId);
            this.bytesReadBySplit = SourceMetrics.bytesReadBySplit((String)splitId);
            this.backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit((String)splitId);
            this.backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit((String)splitId);
        }

        private void consumerPollLoop() {
            while (!this.closed.get()) {
                try {
                    ConsumerRecords records = this.consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
                    if (records.isEmpty() || this.closed.get()) continue;
                    this.availableRecordsQueue.put((ConsumerRecords<byte[], byte[]>)records);
                }
                catch (InterruptedException e) {
                    LOG.warn("{}: consumer thread is interrupted", (Object)this, (Object)e);
                    break;
                }
                catch (WakeupException e) {
                    // empty catch block
                    break;
                }
            }
            LOG.info("{}: Returning from consumer pool loop", (Object)this);
        }

        private void nextBatch() {
            ConsumerRecords<byte[], byte[]> records;
            this.curBatch = Collections.emptyIterator();
            try {
                records = this.availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.warn("{}: Unexpected", (Object)this, (Object)e);
                return;
            }
            if (records == null) {
                return;
            }
            LinkedList<PartitionState> nonEmpty = new LinkedList<PartitionState>();
            for (PartitionState p : this.partitionStates) {
                p.recordIter = records.records(p.topicPartition).iterator();
                if (!p.recordIter.hasNext()) continue;
                nonEmpty.add(p);
            }
            this.curBatch = Iterators.cycle(nonEmpty);
        }

        private void setupInitialOffset(PartitionState pState) {
            Read spec = ((UnboundedKafkaSource)this.source).spec;
            if (pState.nextOffset != -1L) {
                this.consumer.seek(pState.topicPartition, pState.nextOffset);
            } else {
                Instant startReadTime = spec.getStartReadTime();
                if (startReadTime != null) {
                    pState.nextOffset = this.consumerSpEL.offsetForTime(this.consumer, pState.topicPartition, spec.getStartReadTime());
                    this.consumer.seek(pState.topicPartition, pState.nextOffset);
                } else {
                    pState.nextOffset = this.consumer.position(pState.topicPartition);
                }
            }
        }

        public boolean start() throws IOException {
            int defaultPartitionInitTimeout = 60000;
            int kafkaRequestTimeoutMultiple = 2;
            Read spec = ((UnboundedKafkaSource)this.source).spec;
            this.consumer = (Consumer)spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
            this.consumerSpEL.evaluateAssign(this.consumer, spec.getTopicPartitions());
            try {
                this.keyDeserializerInstance = ((UnboundedKafkaSource)this.source).spec.getKeyDeserializer().newInstance();
                this.valueDeserializerInstance = ((UnboundedKafkaSource)this.source).spec.getValueDeserializer().newInstance();
            }
            catch (IllegalAccessException | InstantiationException e) {
                throw new IOException("Could not instantiate deserializers", e);
            }
            this.keyDeserializerInstance.configure(spec.getConsumerConfig(), true);
            this.valueDeserializerInstance.configure(spec.getConsumerConfig(), false);
            for (final PartitionState pState : this.partitionStates) {
                Future<?> future = this.consumerPollThread.submit(new Runnable(){

                    @Override
                    public void run() {
                        UnboundedKafkaReader.this.setupInitialOffset(pState);
                    }
                });
                try {
                    Integer reqTimeout = (Integer)((UnboundedKafkaSource)this.source).spec.getConsumerConfig().get("request.timeout.ms");
                    future.get(reqTimeout != null ? (long)(2 * reqTimeout) : 60000L, TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException e) {
                    this.consumer.wakeup();
                    String msg = String.format("%s: Timeout while initializing partition '%s'. Kafka client may not be able to connect to servers.", new Object[]{this, pState.topicPartition});
                    LOG.error("{}", (Object)msg);
                    throw new IOException(msg);
                }
                catch (Exception e) {
                    throw new IOException(e);
                }
                LOG.info("{}: reading from {} starting at offset {}", new Object[]{this.name, pState.topicPartition, pState.nextOffset});
            }
            this.consumerPollThread.submit(new Runnable(){

                @Override
                public void run() {
                    UnboundedKafkaReader.this.consumerPollLoop();
                }
            });
            Object groupId = spec.getConsumerConfig().get("group.id");
            String offsetGroupId = String.format("%s_offset_consumer_%d_%s", this.name, new Random().nextInt(Integer.MAX_VALUE), groupId == null ? "none" : groupId);
            HashMap<String, Object> offsetConsumerConfig = new HashMap<String, Object>(spec.getConsumerConfig());
            offsetConsumerConfig.put("group.id", offsetGroupId);
            offsetConsumerConfig.put("enable.auto.commit", false);
            this.offsetConsumer = (Consumer)spec.getConsumerFactoryFn().apply(offsetConsumerConfig);
            this.consumerSpEL.evaluateAssign(this.offsetConsumer, spec.getTopicPartitions());
            this.offsetFetcherThread.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    UnboundedKafkaReader.this.updateLatestOffsets();
                }
            }, 0L, 5L, TimeUnit.SECONDS);
            this.nextBatch();
            return this.advance();
        }

        public boolean advance() throws IOException {
            while (true) {
                if (this.curBatch.hasNext()) {
                    PartitionState pState = this.curBatch.next();
                    this.elementsRead.inc();
                    this.elementsReadBySplit.inc();
                    if (!pState.recordIter.hasNext()) {
                        pState.recordIter = Collections.emptyIterator();
                        this.curBatch.remove();
                        continue;
                    }
                    ConsumerRecord rawRecord = (ConsumerRecord)pState.recordIter.next();
                    long expected = pState.nextOffset;
                    long offset = rawRecord.offset();
                    if (offset < expected) {
                        LOG.warn("{}: ignoring already consumed offset {} for {}", new Object[]{this, offset, pState.topicPartition});
                        continue;
                    }
                    long offsetGap = offset - expected;
                    if (this.curRecord == null) {
                        LOG.info("{}: first record offset {}", (Object)this.name, (Object)offset);
                        offsetGap = 0L;
                    }
                    this.curRecord = null;
                    KafkaRecord<Object, Object> record = new KafkaRecord<Object, Object>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), this.consumerSpEL.getRecordTimestamp((ConsumerRecord<byte[], byte[]>)rawRecord), this.keyDeserializerInstance.deserialize(rawRecord.topic(), (byte[])rawRecord.key()), this.valueDeserializerInstance.deserialize(rawRecord.topic(), (byte[])rawRecord.value()));
                    this.curTimestamp = ((UnboundedKafkaSource)this.source).spec.getTimestampFn() == null ? Instant.now() : (Instant)((UnboundedKafkaSource)this.source).spec.getTimestampFn().apply(record);
                    this.curRecord = record;
                    int recordSize = (rawRecord.key() == null ? 0 : ((byte[])rawRecord.key()).length) + (rawRecord.value() == null ? 0 : ((byte[])rawRecord.value()).length);
                    pState.recordConsumed(offset, recordSize, offsetGap);
                    this.bytesRead.inc((long)recordSize);
                    this.bytesReadBySplit.inc((long)recordSize);
                    return true;
                }
                this.nextBatch();
                if (!this.curBatch.hasNext()) break;
            }
            return false;
        }

        private void updateLatestOffsets() {
            for (PartitionState p : this.partitionStates) {
                try {
                    this.consumerSpEL.evaluateSeek2End(this.offsetConsumer, p.topicPartition);
                    long offset = this.offsetConsumer.position(p.topicPartition);
                    p.setLatestOffset(offset);
                }
                catch (Exception e) {
                    if (this.closed.get()) break;
                    LOG.warn("{}: exception while fetching latest offset for partition {}. will be retried.", new Object[]{this, p.topicPartition, e});
                    p.setLatestOffset(-1L);
                }
                LOG.debug("{}: latest offset update for {} : {} (consumer offset {}, avg record size {})", new Object[]{this, p.topicPartition, p.latestOffset, p.nextOffset, p.avgRecordSize});
            }
            LOG.debug("{}:  backlog {}", (Object)this, (Object)this.getSplitBacklogBytes());
        }

        private void reportBacklog() {
            long splitBacklogBytes = this.getSplitBacklogBytes();
            if (splitBacklogBytes < 0L) {
                splitBacklogBytes = -1L;
            }
            this.backlogBytesOfSplit.set(splitBacklogBytes);
            long splitBacklogMessages = this.getSplitBacklogMessageCount();
            if (splitBacklogMessages < 0L) {
                splitBacklogMessages = -1L;
            }
            this.backlogElementsOfSplit.set(splitBacklogMessages);
        }

        public Instant getWatermark() {
            if (this.curRecord == null) {
                LOG.debug("{}: getWatermark() : no records have been read yet.", (Object)this.name);
                return initialWatermark;
            }
            return ((UnboundedKafkaSource)this.source).spec.getWatermarkFn() != null ? (Instant)((UnboundedKafkaSource)this.source).spec.getWatermarkFn().apply(this.curRecord) : this.curTimestamp;
        }

        public UnboundedSource.CheckpointMark getCheckpointMark() {
            this.reportBacklog();
            return new KafkaCheckpointMark(ImmutableList.copyOf(Lists.transform(this.partitionStates, new Function<PartitionState, KafkaCheckpointMark.PartitionMark>(){

                @Override
                public KafkaCheckpointMark.PartitionMark apply(PartitionState p) {
                    return new KafkaCheckpointMark.PartitionMark(p.topicPartition.topic(), p.topicPartition.partition(), p.nextOffset);
                }
            })));
        }

        public UnboundedSource<KafkaRecord<K, V>, ?> getCurrentSource() {
            return this.source;
        }

        public KafkaRecord<K, V> getCurrent() throws NoSuchElementException {
            return this.curRecord;
        }

        public Instant getCurrentTimestamp() throws NoSuchElementException {
            return this.curTimestamp;
        }

        public long getSplitBacklogBytes() {
            long backlogBytes = 0L;
            for (PartitionState p : this.partitionStates) {
                long pBacklog = p.approxBacklogInBytes();
                if (pBacklog == -1L) {
                    return -1L;
                }
                backlogBytes += pBacklog;
            }
            return backlogBytes;
        }

        private long getSplitBacklogMessageCount() {
            long backlogCount = 0L;
            for (PartitionState p : this.partitionStates) {
                long pBacklog = p.backlogMessageCount();
                if (pBacklog == -1L) {
                    return -1L;
                }
                backlogCount += pBacklog;
            }
            return backlogCount;
        }

        public void close() throws IOException {
            this.closed.set(true);
            this.consumerPollThread.shutdown();
            this.offsetFetcherThread.shutdown();
            boolean isShutdown = false;
            while (!isShutdown) {
                if (this.consumer != null) {
                    this.consumer.wakeup();
                }
                if (this.offsetConsumer != null) {
                    this.offsetConsumer.wakeup();
                }
                this.availableRecordsQueue.poll();
                try {
                    isShutdown = this.consumerPollThread.awaitTermination(10L, TimeUnit.SECONDS) && this.offsetFetcherThread.awaitTermination(10L, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
                if (isShutdown) continue;
                LOG.warn("An internal thread is taking a long time to shutdown. will retry.");
            }
            Closeables.close(this.keyDeserializerInstance, true);
            Closeables.close(this.valueDeserializerInstance, true);
            Closeables.close(this.offsetConsumer, true);
            Closeables.close(this.consumer, true);
        }

        private static class PartitionState {
            private final TopicPartition topicPartition;
            private long nextOffset;
            private long latestOffset;
            private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();
            private MovingAvg avgRecordSize = new MovingAvg();
            private MovingAvg avgOffsetGap = new MovingAvg();

            PartitionState(TopicPartition partition, long nextOffset) {
                this.topicPartition = partition;
                this.nextOffset = nextOffset;
                this.latestOffset = -1L;
            }

            void recordConsumed(long offset, int size, long offsetGap) {
                this.nextOffset = offset + 1L;
                this.avgRecordSize.update(size);
                this.avgOffsetGap.update(offsetGap);
            }

            synchronized void setLatestOffset(long latestOffset) {
                this.latestOffset = latestOffset;
            }

            synchronized long approxBacklogInBytes() {
                long backlogMessageCount = this.backlogMessageCount();
                if (backlogMessageCount == -1L) {
                    return -1L;
                }
                return (long)((double)backlogMessageCount * this.avgRecordSize.get());
            }

            synchronized long backlogMessageCount() {
                if (this.latestOffset < 0L || this.nextOffset < 0L) {
                    return -1L;
                }
                double remaining = (double)(this.latestOffset - this.nextOffset) / (1.0 + this.avgOffsetGap.get());
                return Math.max(0L, (long)Math.ceil(remaining));
            }
        }

        private static class MovingAvg {
            private static final int MOVING_AVG_WINDOW = 1000;
            private double avg = 0.0;
            private long numUpdates = 0L;

            private MovingAvg() {
            }

            void update(double quantity) {
                ++this.numUpdates;
                this.avg += (quantity - this.avg) / (double)Math.min(1000L, this.numUpdates);
            }

            double get() {
                return this.avg;
            }
        }
    }

    private static class UnboundedKafkaSource<K, V>
    extends UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> {
        private Read<K, V> spec;
        private final int id;

        public UnboundedKafkaSource(Read<K, V> spec, int id) {
            this.spec = spec;
            this.id = id;
        }

        public List<UnboundedKafkaSource<K, V>> split(int desiredNumSplits, PipelineOptions options) throws Exception {
            int i;
            ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>(this.spec.getTopicPartitions());
            if (partitions.isEmpty()) {
                try (Consumer consumer = (Consumer)this.spec.getConsumerFactoryFn().apply(this.spec.getConsumerConfig());){
                    for (String topic : this.spec.getTopics()) {
                        for (PartitionInfo p : consumer.partitionsFor(topic)) {
                            partitions.add(new TopicPartition(p.topic(), p.partition()));
                        }
                    }
                }
            }
            Collections.sort(partitions, new Comparator<TopicPartition>(){

                @Override
                public int compare(TopicPartition tp1, TopicPartition tp2) {
                    return ComparisonChain.start().compare((Comparable<?>)((Object)tp1.topic()), (Comparable<?>)((Object)tp2.topic())).compare(tp1.partition(), tp2.partition()).result();
                }
            });
            Preconditions.checkArgument(desiredNumSplits > 0);
            Preconditions.checkState(partitions.size() > 0, "Could not find any partitions. Please check Kafka configuration and topic names");
            int numSplits = Math.min(desiredNumSplits, partitions.size());
            ArrayList assignments = new ArrayList(numSplits);
            for (i = 0; i < numSplits; ++i) {
                assignments.add(new ArrayList());
            }
            for (i = 0; i < partitions.size(); ++i) {
                ((List)assignments.get(i % numSplits)).add(partitions.get(i));
            }
            ArrayList<UnboundedKafkaSource<K, V>> result = new ArrayList<UnboundedKafkaSource<K, V>>(numSplits);
            for (int i2 = 0; i2 < numSplits; ++i2) {
                List assignedToSplit = (List)assignments.get(i2);
                LOG.info("Partitions assigned to split {} (total {}): {}", new Object[]{i2, assignedToSplit.size(), Joiner.on(",").join(assignedToSplit)});
                result.add(new UnboundedKafkaSource<K, V>(this.spec.toBuilder().setTopics(Collections.emptyList()).setTopicPartitions(assignedToSplit).build(), i2));
            }
            return result;
        }

        public UnboundedKafkaReader<K, V> createReader(PipelineOptions options, KafkaCheckpointMark checkpointMark) {
            if (this.spec.getTopicPartitions().isEmpty()) {
                LOG.warn("Looks like generateSplits() is not called. Generate single split.");
                try {
                    return new UnboundedKafkaReader<K, V>(this.split(1, options).get(0), checkpointMark);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            return new UnboundedKafkaReader(this, checkpointMark);
        }

        public Coder<KafkaCheckpointMark> getCheckpointMarkCoder() {
            return AvroCoder.of(KafkaCheckpointMark.class);
        }

        public boolean requiresDeduping() {
            return false;
        }

        public void validate() {
            this.spec.validate(null);
        }

        public Coder<KafkaRecord<K, V>> getDefaultOutputCoder() {
            return KafkaRecordCoder.of(this.spec.getKeyCoder(), this.spec.getValueCoder());
        }
    }

    public static class TypedWithoutMetadata<K, V>
    extends PTransform<PBegin, PCollection<KV<K, V>>> {
        private final Read<K, V> read;

        TypedWithoutMetadata(Read<K, V> read) {
            super("KafkaIO.Read");
            this.read = read;
        }

        public PCollection<KV<K, V>> expand(PBegin begin) {
            return (PCollection)((PCollection)begin.apply(this.read)).apply("Remove Kafka Metadata", (PTransform)ParDo.of((DoFn)new DoFn<KafkaRecord<K, V>, KV<K, V>>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext ctx) {
                    ctx.output(((KafkaRecord)ctx.element()).getKV());
                }
            }));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.read.populateDisplayData(builder);
        }
    }

    public static abstract class Read<K, V>
    extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
        private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of("key.deserializer", "Set keyDeserializer instead", "value.deserializer", "Set valueDeserializer instead");
        private static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES = ImmutableMap.of("key.deserializer", ByteArrayDeserializer.class.getName(), "value.deserializer", ByteArrayDeserializer.class.getName(), "receive.buffer.bytes", 524288, "auto.offset.reset", "latest", "enable.auto.commit", false);
        private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> KAFKA_CONSUMER_FACTORY_FN = new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>(){

            public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
                return new KafkaConsumer(config);
            }
        };

        abstract Map<String, Object> getConsumerConfig();

        abstract List<String> getTopics();

        abstract List<TopicPartition> getTopicPartitions();

        @Nullable
        abstract Coder<K> getKeyCoder();

        @Nullable
        abstract Coder<V> getValueCoder();

        @Nullable
        abstract Class<? extends Deserializer<K>> getKeyDeserializer();

        @Nullable
        abstract Class<? extends Deserializer<V>> getValueDeserializer();

        abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> getConsumerFactoryFn();

        @Nullable
        abstract SerializableFunction<KafkaRecord<K, V>, Instant> getTimestampFn();

        @Nullable
        abstract SerializableFunction<KafkaRecord<K, V>, Instant> getWatermarkFn();

        abstract long getMaxNumRecords();

        @Nullable
        abstract Duration getMaxReadTime();

        @Nullable
        abstract Instant getStartReadTime();

        abstract Builder<K, V> toBuilder();

        public Read<K, V> withBootstrapServers(String bootstrapServers) {
            return this.updateConsumerProperties(ImmutableMap.of("bootstrap.servers", bootstrapServers));
        }

        public Read<K, V> withTopic(String topic) {
            return this.withTopics(ImmutableList.of(topic));
        }

        public Read<K, V> withTopics(List<String> topics) {
            Preconditions.checkState(this.getTopicPartitions().isEmpty(), "Only topics or topicPartitions can be set, not both");
            return this.toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
        }

        public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
            Preconditions.checkState(this.getTopics().isEmpty(), "Only topics or topicPartitions can be set, not both");
            return this.toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
        }

        public Read<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer) {
            return this.toBuilder().setKeyDeserializer(keyDeserializer).build();
        }

        public Read<K, V> withKeyDeserializerAndCoder(Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
            return this.toBuilder().setKeyDeserializer(keyDeserializer).setKeyCoder(keyCoder).build();
        }

        public Read<K, V> withValueDeserializer(Class<? extends Deserializer<V>> valueDeserializer) {
            return this.toBuilder().setValueDeserializer(valueDeserializer).build();
        }

        public Read<K, V> withValueDeserializerAndCoder(Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) {
            return this.toBuilder().setValueDeserializer(valueDeserializer).setValueCoder(valueCoder).build();
        }

        public Read<K, V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) {
            return this.toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
        }

        public Read<K, V> updateConsumerProperties(Map<String, Object> configUpdates) {
            Map config = KafkaIO.updateKafkaProperties(this.getConsumerConfig(), Read.IGNORED_CONSUMER_PROPERTIES, configUpdates);
            return this.toBuilder().setConsumerConfig(config).build();
        }

        public Read<K, V> withMaxNumRecords(long maxNumRecords) {
            return this.toBuilder().setMaxNumRecords(maxNumRecords).setMaxReadTime(null).build();
        }

        public Read<K, V> withStartReadTime(Instant startReadTime) {
            return this.toBuilder().setStartReadTime(startReadTime).build();
        }

        public Read<K, V> withMaxReadTime(Duration maxReadTime) {
            return this.toBuilder().setMaxNumRecords(Long.MAX_VALUE).setMaxReadTime(maxReadTime).build();
        }

        public Read<K, V> withTimestampFn2(SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
            Preconditions.checkNotNull(timestampFn);
            return this.toBuilder().setTimestampFn(timestampFn).build();
        }

        public Read<K, V> withWatermarkFn2(SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) {
            Preconditions.checkNotNull(watermarkFn);
            return this.toBuilder().setWatermarkFn(watermarkFn).build();
        }

        public Read<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) {
            Preconditions.checkNotNull(timestampFn);
            return this.withTimestampFn2(Read.unwrapKafkaAndThen(timestampFn));
        }

        public Read<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) {
            Preconditions.checkNotNull(watermarkFn);
            return this.withWatermarkFn2(Read.unwrapKafkaAndThen(watermarkFn));
        }

        public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
            return new TypedWithoutMetadata(this);
        }

        public void validate(PipelineOptions options) {
            Preconditions.checkNotNull(this.getConsumerConfig().get("bootstrap.servers"), "Kafka bootstrap servers should be set");
            Preconditions.checkArgument(this.getTopics().size() > 0 || this.getTopicPartitions().size() > 0, "Kafka topics or topic_partitions are required");
            Preconditions.checkNotNull(this.getKeyDeserializer(), "Key deserializer must be set");
            Preconditions.checkNotNull(this.getValueDeserializer(), "Value deserializer must be set");
            if (this.getStartReadTime() != null) {
                Preconditions.checkArgument(new ConsumerSpEL().hasOffsetsForTimes(), "Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, current version of Kafka Client is " + AppInfoParser.getVersion() + ". If you are building with maven, set \"kafka.clients.version\" maven property to 0.10.1.0 or newer.");
            }
        }

        public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
            Read.Unbounded unbounded;
            CoderRegistry registry = input.getPipeline().getCoderRegistry();
            Coder<K> keyCoder = Preconditions.checkNotNull(this.getKeyCoder() != null ? this.getKeyCoder() : KafkaIO.inferCoder(registry, this.getKeyDeserializer()), "Key coder could not be inferred from key deserializer. Please providekey coder explicitly using withKeyDeserializerAndCoder()");
            Coder<V> valueCoder = Preconditions.checkNotNull(this.getValueCoder() != null ? this.getValueCoder() : KafkaIO.inferCoder(registry, this.getValueDeserializer()), "Value coder could not be inferred from value deserializer. Please providevalue coder explicitly using withValueDeserializerAndCoder()");
            Read.Unbounded transform = unbounded = org.apache.beam.sdk.io.Read.from(this.toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
            if (this.getMaxNumRecords() < Long.MAX_VALUE) {
                transform = unbounded.withMaxNumRecords(this.getMaxNumRecords());
            } else if (this.getMaxReadTime() != null) {
                transform = unbounded.withMaxReadTime(this.getMaxReadTime());
            }
            return (PCollection)input.getPipeline().apply((PTransform)transform);
        }

        @VisibleForTesting
        UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
            return new UnboundedKafkaSource(this, -1);
        }

        private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT> unwrapKafkaAndThen(final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {
            return new SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>(){

                public OutT apply(KafkaRecord<KeyT, ValueT> record) {
                    return fn.apply(record.getKV());
                }
            };
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            List<String> topics = this.getTopics();
            List<TopicPartition> topicPartitions = this.getTopicPartitions();
            if (topics.size() > 0) {
                builder.add(DisplayData.item((String)"topics", (String)Joiner.on(",").join(topics)).withLabel("Topic/s"));
            } else if (topicPartitions.size() > 0) {
                builder.add(DisplayData.item((String)"topicPartitions", (String)Joiner.on(",").join(topicPartitions)).withLabel("Topic Partition/s"));
            }
            Set<String> ignoredConsumerPropertiesKeys = IGNORED_CONSUMER_PROPERTIES.keySet();
            for (Map.Entry<String, Object> conf : this.getConsumerConfig().entrySet()) {
                String key = conf.getKey();
                if (ignoredConsumerPropertiesKeys.contains(key)) continue;
                Object value = DisplayData.inferType((Object)conf.getValue()) != null ? conf.getValue() : String.valueOf(conf.getValue());
                builder.add(DisplayData.item((String)key, (ValueProvider)ValueProvider.StaticValueProvider.of((Object)value)));
            }
        }

        static abstract class Builder<K, V> {
            Builder() {
            }

            abstract Builder<K, V> setConsumerConfig(Map<String, Object> var1);

            abstract Builder<K, V> setTopics(List<String> var1);

            abstract Builder<K, V> setTopicPartitions(List<TopicPartition> var1);

            abstract Builder<K, V> setKeyCoder(Coder<K> var1);

            abstract Builder<K, V> setValueCoder(Coder<V> var1);

            abstract Builder<K, V> setKeyDeserializer(Class<? extends Deserializer<K>> var1);

            abstract Builder<K, V> setValueDeserializer(Class<? extends Deserializer<V>> var1);

            abstract Builder<K, V> setConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> var1);

            abstract Builder<K, V> setTimestampFn(SerializableFunction<KafkaRecord<K, V>, Instant> var1);

            abstract Builder<K, V> setWatermarkFn(SerializableFunction<KafkaRecord<K, V>, Instant> var1);

            abstract Builder<K, V> setMaxNumRecords(long var1);

            abstract Builder<K, V> setMaxReadTime(Duration var1);

            abstract Builder<K, V> setStartReadTime(Instant var1);

            abstract Read<K, V> build();
        }
    }
}

