/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.kafka.ConsumerSpEL;
import org.apache.beam.sdk.io.kafka.DeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaSourceDescriptor;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DoFn.UnboundedPerElement
class ReadFromKafkaDoFn<K, V>
extends DoFn<KafkaSourceDescriptor, KafkaRecord<K, V>> {
    private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class);
    private final Map<String, Object> offsetConsumerConfig;
    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
    private final SerializableFunction<KafkaRecord<K, V>, Instant> extractOutputTimestampFn;
    private final SerializableFunction<Instant, WatermarkEstimator<Instant>> createWatermarkEstimatorFn;
    private final TimestampPolicyFactory<K, V> timestampPolicyFactory;
    private transient ConsumerSpEL consumerSpEL = null;
    private transient Deserializer<K> keyDeserializerInstance = null;
    private transient Deserializer<V> valueDeserializerInstance = null;
    private transient LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize;
    private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis((long)1000L);
    @VisibleForTesting
    final DeserializerProvider keyDeserializerProvider;
    @VisibleForTesting
    final DeserializerProvider valueDeserializerProvider;
    @VisibleForTesting
    final Map<String, Object> consumerConfig;

    ReadFromKafkaDoFn(KafkaIO.ReadSourceDescriptors transform) {
        this.consumerConfig = transform.getConsumerConfig();
        this.offsetConsumerConfig = transform.getOffsetConsumerConfig();
        this.keyDeserializerProvider = transform.getKeyDeserializerProvider();
        this.valueDeserializerProvider = transform.getValueDeserializerProvider();
        this.consumerFactoryFn = transform.getConsumerFactoryFn();
        this.extractOutputTimestampFn = transform.getExtractOutputTimestampFn();
        this.createWatermarkEstimatorFn = transform.getCreateWatermarkEstimatorFn();
        this.timestampPolicyFactory = transform.getTimestampPolicyFactory();
    }

    @DoFn.GetInitialRestriction
    public OffsetRange initialRestriction(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor) {
        Map<String, Object> updatedConsumerConfig = this.overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor);
        try (Consumer offsetConsumer = (Consumer)this.consumerFactoryFn.apply(KafkaIOUtils.getOffsetConsumerConfig("initialOffset", this.offsetConsumerConfig, updatedConsumerConfig));){
            this.consumerSpEL.evaluateAssign(offsetConsumer, (Collection<TopicPartition>)ImmutableList.of((Object)kafkaSourceDescriptor.getTopicPartition()));
            long startOffset = kafkaSourceDescriptor.getStartReadOffset() != null ? kafkaSourceDescriptor.getStartReadOffset() : (kafkaSourceDescriptor.getStartReadTime() != null ? this.consumerSpEL.offsetForTime(offsetConsumer, kafkaSourceDescriptor.getTopicPartition(), kafkaSourceDescriptor.getStartReadTime()) : offsetConsumer.position(kafkaSourceDescriptor.getTopicPartition()));
            OffsetRange offsetRange = new OffsetRange(startOffset, Long.MAX_VALUE);
            return offsetRange;
        }
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp Instant currentElementTimestamp) {
        return currentElementTimestamp;
    }

    @DoFn.NewWatermarkEstimator
    public WatermarkEstimator<Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant watermarkEstimatorState) {
        return (WatermarkEstimator)this.createWatermarkEstimatorFn.apply((Object)watermarkEstimatorState);
    }

    @DoFn.GetSize
    public double getSize(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor, @DoFn.Restriction OffsetRange offsetRange) throws Exception {
        double numRecords = this.restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining();
        if (!this.avgRecordSize.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition())) {
            return numRecords;
        }
        return ((AverageRecordSize)this.avgRecordSize.get((Object)kafkaSourceDescriptor.getTopicPartition())).getTotalSize(numRecords);
    }

    @DoFn.NewTracker
    public GrowableOffsetRangeTracker restrictionTracker(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor, @DoFn.Restriction OffsetRange restriction) {
        Map<String, Object> updatedConsumerConfig = this.overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor);
        KafkaLatestOffsetEstimator offsetPoller = new KafkaLatestOffsetEstimator((Consumer<byte[], byte[]>)((Consumer)this.consumerFactoryFn.apply(KafkaIOUtils.getOffsetConsumerConfig("tracker-" + kafkaSourceDescriptor.getTopicPartition(), this.offsetConsumerConfig, updatedConsumerConfig))), kafkaSourceDescriptor.getTopicPartition());
        return new GrowableOffsetRangeTracker(restriction.getFrom(), (GrowableOffsetRangeTracker.RangeEndEstimator)offsetPoller);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @DoFn.ProcessElement
    public DoFn.ProcessContinuation processElement(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor, RestrictionTracker<OffsetRange, Long> tracker, WatermarkEstimator watermarkEstimator, DoFn.OutputReceiver<KafkaRecord<K, V>> receiver) {
        Map<String, Object> updatedConsumerConfig = this.overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor);
        TimestampPolicy<Object, Object> timestampPolicy = null;
        if (this.timestampPolicyFactory != null) {
            timestampPolicy = this.timestampPolicyFactory.createTimestampPolicy(kafkaSourceDescriptor.getTopicPartition(), Optional.ofNullable(watermarkEstimator.currentWatermark()));
        }
        try (Consumer consumer = (Consumer)this.consumerFactoryFn.apply(updatedConsumerConfig);){
            long startOffset;
            this.consumerSpEL.evaluateAssign(consumer, (Collection<TopicPartition>)ImmutableList.of((Object)kafkaSourceDescriptor.getTopicPartition()));
            long expectedOffset = startOffset = ((OffsetRange)tracker.currentRestriction()).getFrom();
            consumer.seek(kafkaSourceDescriptor.getTopicPartition(), startOffset);
            ConsumerRecords rawRecords = ConsumerRecords.empty();
            block7: while (true) {
                Object object;
                if ((rawRecords = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis())).isEmpty()) {
                    object = DoFn.ProcessContinuation.resume();
                    return object;
                }
                object = rawRecords.iterator();
                while (true) {
                    Instant outputTimestamp;
                    if (!object.hasNext()) continue block7;
                    ConsumerRecord rawRecord = (ConsumerRecord)object.next();
                    if (!tracker.tryClaim((Object)rawRecord.offset())) {
                        DoFn.ProcessContinuation processContinuation = DoFn.ProcessContinuation.stop();
                        return processContinuation;
                    }
                    KafkaRecord<Object, Object> kafkaRecord = new KafkaRecord<Object, Object>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), this.consumerSpEL.getRecordTimestamp((ConsumerRecord<byte[], byte[]>)rawRecord), this.consumerSpEL.getRecordTimestampType((ConsumerRecord<byte[], byte[]>)rawRecord), ConsumerSpEL.hasHeaders() ? rawRecord.headers() : null, this.consumerSpEL.deserializeKey(this.keyDeserializerInstance, (ConsumerRecord<byte[], byte[]>)rawRecord), this.consumerSpEL.deserializeValue(this.valueDeserializerInstance, (ConsumerRecord<byte[], byte[]>)rawRecord));
                    int recordSize = (rawRecord.key() == null ? 0 : ((byte[])rawRecord.key()).length) + (rawRecord.value() == null ? 0 : ((byte[])rawRecord.value()).length);
                    ((AverageRecordSize)this.avgRecordSize.getUnchecked((Object)kafkaSourceDescriptor.getTopicPartition())).update(recordSize, rawRecord.offset() - expectedOffset);
                    expectedOffset = rawRecord.offset() + 1L;
                    if (timestampPolicy != null) {
                        Preconditions.checkState((boolean)(watermarkEstimator instanceof ManualWatermarkEstimator));
                        KafkaUnboundedReader.TimestampPolicyContext context = new KafkaUnboundedReader.TimestampPolicyContext((long)((RestrictionTracker.HasProgress)tracker).getProgress().getWorkRemaining(), Instant.now());
                        outputTimestamp = timestampPolicy.getTimestampForRecord(context, kafkaRecord);
                        ((ManualWatermarkEstimator)watermarkEstimator).setWatermark(timestampPolicy.getWatermark(context));
                    } else {
                        outputTimestamp = (Instant)this.extractOutputTimestampFn.apply(kafkaRecord);
                    }
                    receiver.outputWithTimestamp(kafkaRecord, outputTimestamp);
                    continue;
                    break;
                }
                break;
            }
        }
    }

    @DoFn.GetRestrictionCoder
    public Coder<OffsetRange> restrictionCoder() {
        return new OffsetRange.Coder();
    }

    @DoFn.Setup
    public void setup() throws Exception {
        this.avgRecordSize = CacheBuilder.newBuilder().maximumSize(1000L).build((CacheLoader)new CacheLoader<TopicPartition, AverageRecordSize>(){

            public AverageRecordSize load(TopicPartition topicPartition) throws Exception {
                return new AverageRecordSize();
            }
        });
        this.consumerSpEL = new ConsumerSpEL();
        this.keyDeserializerInstance = this.keyDeserializerProvider.getDeserializer(this.consumerConfig, true);
        this.valueDeserializerInstance = this.valueDeserializerProvider.getDeserializer(this.consumerConfig, false);
    }

    @DoFn.Teardown
    public void teardown() throws Exception {
        try {
            Closeables.close(this.keyDeserializerInstance, (boolean)true);
            Closeables.close(this.valueDeserializerInstance, (boolean)true);
        }
        catch (Exception anyException) {
            LOG.warn("Fail to close resource during finishing bundle.", (Throwable)anyException);
        }
    }

    private Map<String, Object> overrideBootstrapServersConfig(Map<String, Object> currentConfig, KafkaSourceDescriptor description) {
        Preconditions.checkState((currentConfig.containsKey("bootstrap.servers") || description.getBootStrapServers() != null ? 1 : 0) != 0);
        HashMap<String, Object> config = new HashMap<String, Object>(currentConfig);
        if (description.getBootStrapServers() != null && description.getBootStrapServers().size() > 0) {
            config.put("bootstrap.servers", String.join((CharSequence)",", description.getBootStrapServers()));
        }
        return config;
    }

    private static class AverageRecordSize {
        private KafkaIOUtils.MovingAvg avgRecordSize = new KafkaIOUtils.MovingAvg();
        private KafkaIOUtils.MovingAvg avgRecordGap = new KafkaIOUtils.MovingAvg();

        public void update(int recordSize, long gap) {
            this.avgRecordSize.update(recordSize);
            this.avgRecordGap.update(gap);
        }

        public double getTotalSize(double numRecords) {
            return this.avgRecordSize.get() * numRecords / (1.0 + this.avgRecordGap.get());
        }
    }

    private static class KafkaLatestOffsetEstimator
    implements GrowableOffsetRangeTracker.RangeEndEstimator {
        private final Consumer<byte[], byte[]> offsetConsumer;
        private final TopicPartition topicPartition;
        private final ConsumerSpEL consumerSpEL;
        private final Supplier<Long> memoizedBacklog;

        KafkaLatestOffsetEstimator(Consumer<byte[], byte[]> offsetConsumer, TopicPartition topicPartition) {
            this.offsetConsumer = offsetConsumer;
            this.topicPartition = topicPartition;
            this.consumerSpEL = new ConsumerSpEL();
            this.consumerSpEL.evaluateAssign(this.offsetConsumer, (Collection<TopicPartition>)ImmutableList.of((Object)this.topicPartition));
            this.memoizedBacklog = Suppliers.memoizeWithExpiration(() -> {
                this.consumerSpEL.evaluateSeek2End(offsetConsumer, topicPartition);
                return offsetConsumer.position(topicPartition);
            }, (long)5L, (TimeUnit)TimeUnit.SECONDS);
        }

        protected void finalize() {
            try {
                Closeables.close(this.offsetConsumer, (boolean)true);
            }
            catch (Exception anyException) {
                LOG.warn("Failed to close offset consumer for {}", (Object)this.topicPartition);
            }
        }

        public long estimate() {
            return (Long)this.memoizedBacklog.get();
        }
    }
}

