/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.io.Closeable;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.kafka.CheckStopReadingFn;
import org.apache.beam.sdk.io.kafka.ConsumerSpEL;
import org.apache.beam.sdk.io.kafka.DeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaSourceDescriptor;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class ReadFromKafkaDoFn<@UnknownKeyFor K, @UnknownKeyFor V>
extends DoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class);
    private final @Nullable @UnknownKeyFor @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> offsetConsumerConfig;
    private final @Nullable @UnknownKeyFor @Initialized CheckStopReadingFn checkStopReadingFn;
    private final @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>> consumerFactoryFn;
    private final @Nullable @UnknownKeyFor @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> extractOutputTimestampFn;
    private final @Nullable @UnknownKeyFor @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Instant, @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant>> createWatermarkEstimatorFn;
    private final @Nullable @UnknownKeyFor @Initialized TimestampPolicyFactory<K, V> timestampPolicyFactory;
    private final @UnknownKeyFor @NonNull @Initialized BadRecordRouter badRecordRouter;
    private final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> recordTag;
    private transient @Nullable @UnknownKeyFor @Initialized Deserializer<K> keyDeserializerInstance = null;
    private transient @Nullable @UnknownKeyFor @Initialized Deserializer<V> valueDeserializerInstance = null;
    private transient @Nullable @UnknownKeyFor @Initialized Map<@UnknownKeyFor @NonNull @Initialized TopicPartition, @UnknownKeyFor @NonNull @Initialized KafkaLatestOffsetEstimator> offsetEstimatorCache;
    private transient @Nullable @UnknownKeyFor @Initialized LoadingCache<@UnknownKeyFor @NonNull @Initialized TopicPartition, @UnknownKeyFor @NonNull @Initialized AverageRecordSize> avgRecordSize;
    private static final @UnknownKeyFor @NonNull @Initialized Duration KAFKA_POLL_TIMEOUT = Duration.ofSeconds(1L);
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized DeserializerProvider<K> keyDeserializerProvider;
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized DeserializerProvider<V> valueDeserializerProvider;
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> consumerConfig;
    @VisibleForTesting
    static final @UnknownKeyFor @NonNull @Initialized String METRIC_NAMESPACE = "KafkaIOReader";
    @VisibleForTesting
    static final @UnknownKeyFor @NonNull @Initialized String RAW_SIZE_METRIC_PREFIX = "rawSize/";

    static <K, V> @UnknownKeyFor @NonNull @Initialized ReadFromKafkaDoFn<K, V> create(@UnknownKeyFor @NonNull @Initialized KafkaIO.ReadSourceDescriptors<K, V> transform, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> recordTag) {
        if (transform.isBounded()) {
            return new Bounded<K, V>(transform, recordTag);
        }
        return new Unbounded<K, V>(transform, recordTag);
    }

    private ReadFromKafkaDoFn(@UnknownKeyFor @NonNull @Initialized KafkaIO.ReadSourceDescriptors<K, V> transform, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> recordTag) {
        this.consumerConfig = transform.getConsumerConfig();
        this.offsetConsumerConfig = transform.getOffsetConsumerConfig();
        this.keyDeserializerProvider = (DeserializerProvider)Preconditions.checkArgumentNotNull(transform.getKeyDeserializerProvider());
        this.valueDeserializerProvider = (DeserializerProvider)Preconditions.checkArgumentNotNull(transform.getValueDeserializerProvider());
        this.consumerFactoryFn = transform.getConsumerFactoryFn();
        this.extractOutputTimestampFn = transform.getExtractOutputTimestampFn();
        this.createWatermarkEstimatorFn = transform.getCreateWatermarkEstimatorFn();
        this.timestampPolicyFactory = transform.getTimestampPolicyFactory();
        this.checkStopReadingFn = transform.getCheckStopReadingFn();
        this.badRecordRouter = transform.getBadRecordRouter();
        this.recordTag = recordTag;
    }

    @DoFn.GetInitialRestriction
    public @UnknownKeyFor @NonNull @Initialized OffsetRange initialRestriction(@DoFn.Element @UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor kafkaSourceDescriptor) {
        Map<String, Object> updatedConsumerConfig = this.overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor);
        LOG.info("Creating Kafka consumer for initial restriction for {}", (Object)kafkaSourceDescriptor.getTopicPartition());
        try (Consumer offsetConsumer = (Consumer)this.consumerFactoryFn.apply(updatedConsumerConfig);){
            ConsumerSpEL.evaluateAssign(offsetConsumer, (Collection<TopicPartition>)ImmutableList.of((Object)kafkaSourceDescriptor.getTopicPartition()));
            @Nullable Instant startReadTime = kafkaSourceDescriptor.getStartReadTime();
            long startOffset = kafkaSourceDescriptor.getStartReadOffset() != null ? kafkaSourceDescriptor.getStartReadOffset() : (startReadTime != null ? ConsumerSpEL.offsetForTime(offsetConsumer, kafkaSourceDescriptor.getTopicPartition(), startReadTime) : offsetConsumer.position(kafkaSourceDescriptor.getTopicPartition()));
            long endOffset = Long.MAX_VALUE;
            @Nullable Instant stopReadTime = kafkaSourceDescriptor.getStopReadTime();
            if (kafkaSourceDescriptor.getStopReadOffset() != null) {
                endOffset = kafkaSourceDescriptor.getStopReadOffset();
            } else if (stopReadTime != null) {
                endOffset = ConsumerSpEL.offsetForTime(offsetConsumer, kafkaSourceDescriptor.getTopicPartition(), stopReadTime);
            }
            OffsetRange offsetRange = new OffsetRange(startOffset, endOffset);
            return offsetRange;
        }
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public @UnknownKeyFor @NonNull @Initialized Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp @UnknownKeyFor @NonNull @Initialized Instant currentElementTimestamp) {
        return currentElementTimestamp;
    }

    @DoFn.NewWatermarkEstimator
    public @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState @UnknownKeyFor @NonNull @Initialized Instant watermarkEstimatorState) {
        SerializableFunction createWatermarkEstimatorFn = (SerializableFunction)Preconditions.checkStateNotNull(this.createWatermarkEstimatorFn);
        return (WatermarkEstimator)createWatermarkEstimatorFn.apply((Object)ReadFromKafkaDoFn.ensureTimestampWithinBounds(watermarkEstimatorState));
    }

    @DoFn.GetSize
    public @UnknownKeyFor @NonNull @Initialized double getSize(@DoFn.Element @UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor kafkaSourceDescriptor, @DoFn.Restriction @UnknownKeyFor @NonNull @Initialized OffsetRange offsetRange) throws @UnknownKeyFor @NonNull @Initialized Exception {
        LoadingCache avgRecordSize = (LoadingCache)Preconditions.checkStateNotNull(this.avgRecordSize);
        double numRecords = this.restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining();
        if (!avgRecordSize.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition())) {
            return numRecords;
        }
        return ((AverageRecordSize)avgRecordSize.get((Object)kafkaSourceDescriptor.getTopicPartition())).getTotalSize(numRecords);
    }

    @DoFn.NewTracker
    public @UnknownKeyFor @NonNull @Initialized OffsetRangeTracker restrictionTracker(@DoFn.Element @UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor kafkaSourceDescriptor, @DoFn.Restriction @UnknownKeyFor @NonNull @Initialized OffsetRange restriction) {
        TopicPartition topicPartition;
        if (restriction.getTo() < Long.MAX_VALUE) {
            return new OffsetRangeTracker(restriction);
        }
        Map offsetEstimatorCacheInstance = (Map)Preconditions.checkStateNotNull(this.offsetEstimatorCache);
        KafkaLatestOffsetEstimator offsetEstimator = (KafkaLatestOffsetEstimator)offsetEstimatorCacheInstance.get(topicPartition = kafkaSourceDescriptor.getTopicPartition());
        if (offsetEstimator == null || offsetEstimator.isClosed()) {
            Map<String, Object> updatedConsumerConfig = this.overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor);
            LOG.info("Creating Kafka consumer for offset estimation for {}", (Object)topicPartition);
            Consumer offsetConsumer = (Consumer)this.consumerFactoryFn.apply(KafkaIOUtils.getOffsetConsumerConfig("tracker-" + topicPartition, this.offsetConsumerConfig, updatedConsumerConfig));
            offsetEstimator = new KafkaLatestOffsetEstimator((Consumer<byte[], byte[]>)offsetConsumer, topicPartition);
            offsetEstimatorCacheInstance.put(topicPartition, offsetEstimator);
        }
        return new GrowableOffsetRangeTracker(restriction.getFrom(), (GrowableOffsetRangeTracker.RangeEndEstimator)offsetEstimator);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @DoFn.ProcessElement
    public // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized DoFn.ProcessContinuation processElement(@DoFn.Element @UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor kafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized Long> tracker, @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> watermarkEstimator, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized DoFn.MultiOutputReceiver receiver) throws @UnknownKeyFor @NonNull @Initialized Exception {
        LoadingCache avgRecordSize = (LoadingCache)Preconditions.checkStateNotNull(this.avgRecordSize);
        Deserializer keyDeserializerInstance = (Deserializer)Preconditions.checkStateNotNull(this.keyDeserializerInstance);
        Deserializer valueDeserializerInstance = (Deserializer)Preconditions.checkStateNotNull(this.valueDeserializerInstance);
        Distribution rawSizes = Metrics.distribution((String)METRIC_NAMESPACE, (String)(RAW_SIZE_METRIC_PREFIX + kafkaSourceDescriptor.getTopicPartition().toString()));
        if (this.checkStopReadingFn != null && ((Boolean)this.checkStopReadingFn.apply(kafkaSourceDescriptor.getTopicPartition())).booleanValue()) {
            tracker.tryClaim((Object)(((OffsetRange)tracker.currentRestriction()).getTo() - 1L));
            return DoFn.ProcessContinuation.stop();
        }
        Map<String, Object> updatedConsumerConfig = this.overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor);
        TimestampPolicy timestampPolicy = null;
        if (this.timestampPolicyFactory != null) {
            timestampPolicy = this.timestampPolicyFactory.createTimestampPolicy(kafkaSourceDescriptor.getTopicPartition(), Optional.ofNullable(watermarkEstimator.currentWatermark()));
        }
        LOG.info("Creating Kafka consumer for process continuation for {}", (Object)kafkaSourceDescriptor.getTopicPartition());
        try (Consumer consumer = (Consumer)this.consumerFactoryFn.apply(updatedConsumerConfig);){
            long startOffset;
            HashSet existingTopicPartitions = new HashSet();
            for (List topicPartitionList : consumer.listTopics().values()) {
                topicPartitionList.forEach(partitionInfo -> existingTopicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())));
            }
            if (!existingTopicPartitions.contains(kafkaSourceDescriptor.getTopicPartition())) {
                DoFn.ProcessContinuation processContinuation = DoFn.ProcessContinuation.stop();
                return processContinuation;
            }
            ConsumerSpEL.evaluateAssign(consumer, (Collection<TopicPartition>)ImmutableList.of((Object)kafkaSourceDescriptor.getTopicPartition()));
            long expectedOffset = startOffset = ((OffsetRange)tracker.currentRestriction()).getFrom();
            consumer.seek(kafkaSourceDescriptor.getTopicPartition(), startOffset);
            ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();
            block11: while (true) {
                Object object;
                if ((rawRecords = this.poll((Consumer<byte[], byte[]>)consumer, kafkaSourceDescriptor.getTopicPartition())).isEmpty()) {
                    if (timestampPolicy != null) {
                        this.updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
                    }
                    object = DoFn.ProcessContinuation.resume();
                    return object;
                }
                object = rawRecords.iterator();
                while (true) {
                    if (!object.hasNext()) continue block11;
                    ConsumerRecord rawRecord = (ConsumerRecord)object.next();
                    if (!tracker.tryClaim((Object)rawRecord.offset())) {
                        DoFn.ProcessContinuation processContinuation = DoFn.ProcessContinuation.stop();
                        return processContinuation;
                    }
                    try {
                        Instant outputTimestamp;
                        KafkaRecord kafkaRecord = new KafkaRecord(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), ConsumerSpEL.getRecordTimestamp((ConsumerRecord<byte[], byte[]>)rawRecord), ConsumerSpEL.getRecordTimestampType((ConsumerRecord<byte[], byte[]>)rawRecord), ConsumerSpEL.hasHeaders() ? rawRecord.headers() : null, ConsumerSpEL.deserializeKey(keyDeserializerInstance, (ConsumerRecord<byte[], byte[]>)rawRecord), ConsumerSpEL.deserializeValue(valueDeserializerInstance, (ConsumerRecord<byte[], byte[]>)rawRecord));
                        int recordSize = (rawRecord.key() == null ? 0 : ((byte[])rawRecord.key()).length) + (rawRecord.value() == null ? 0 : ((byte[])rawRecord.value()).length);
                        ((AverageRecordSize)avgRecordSize.getUnchecked((Object)kafkaSourceDescriptor.getTopicPartition())).update(recordSize, rawRecord.offset() - expectedOffset);
                        rawSizes.update((long)recordSize);
                        expectedOffset = rawRecord.offset() + 1L;
                        if (timestampPolicy != null) {
                            KafkaUnboundedReader.TimestampPolicyContext context = this.updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
                            outputTimestamp = timestampPolicy.getTimestampForRecord(context, kafkaRecord);
                        } else {
                            Preconditions.checkStateNotNull(this.extractOutputTimestampFn);
                            outputTimestamp = (Instant)this.extractOutputTimestampFn.apply(kafkaRecord);
                        }
                        receiver.get(this.recordTag).outputWithTimestamp((Object)KV.of((Object)kafkaSourceDescriptor, kafkaRecord), outputTimestamp);
                    }
                    catch (SerializationException e) {
                        this.badRecordRouter.route(receiver, (Object)rawRecord, null, (Exception)((Object)e), "Failure deserializing Key or Value of Kakfa record reading from Kafka");
                        if (timestampPolicy == null) continue;
                        this.updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
                    }
                    continue;
                    break;
                }
                break;
            }
        }
    }

    private @UnknownKeyFor @NonNull @Initialized ConsumerRecords<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> poll(@UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> consumer, @UnknownKeyFor @NonNull @Initialized TopicPartition topicPartition) {
        ConsumerRecords rawRecords;
        Stopwatch sw = Stopwatch.createStarted();
        long previousPosition = -1L;
        Duration elapsed = Duration.ZERO;
        do {
            if (!(rawRecords = consumer.poll(KAFKA_POLL_TIMEOUT.minus(elapsed))).isEmpty()) {
                return rawRecords;
            }
            if (previousPosition != (previousPosition = consumer.position(topicPartition))) continue;
            return rawRecords;
        } while ((elapsed = sw.elapsed()).toMillis() < KAFKA_POLL_TIMEOUT.toMillis());
        return rawRecords;
    }

    private @UnknownKeyFor @NonNull @Initialized KafkaUnboundedReader.TimestampPolicyContext updateWatermarkManually(@UnknownKeyFor @NonNull @Initialized TimestampPolicy<K, V> timestampPolicy, @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> watermarkEstimator, @UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized Long> tracker) {
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState((boolean)(watermarkEstimator instanceof ManualWatermarkEstimator));
        KafkaUnboundedReader.TimestampPolicyContext context = new KafkaUnboundedReader.TimestampPolicyContext((long)((RestrictionTracker.HasProgress)tracker).getProgress().getWorkRemaining(), Instant.now());
        ((ManualWatermarkEstimator)watermarkEstimator).setWatermark(ReadFromKafkaDoFn.ensureTimestampWithinBounds(timestampPolicy.getWatermark(context)));
        return context;
    }

    @DoFn.GetRestrictionCoder
    public @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized OffsetRange> restrictionCoder() {
        return new OffsetRange.Coder();
    }

    @DoFn.Setup
    public void setup() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.avgRecordSize = CacheBuilder.newBuilder().maximumSize(1000L).build((CacheLoader)new CacheLoader<TopicPartition, AverageRecordSize>(){

            public @UnknownKeyFor @NonNull @Initialized AverageRecordSize load(@UnknownKeyFor @NonNull @Initialized TopicPartition topicPartition) throws @UnknownKeyFor @NonNull @Initialized Exception {
                return new AverageRecordSize();
            }
        });
        this.keyDeserializerInstance = this.keyDeserializerProvider.getDeserializer(this.consumerConfig, true);
        this.valueDeserializerInstance = this.valueDeserializerProvider.getDeserializer(this.consumerConfig, false);
        this.offsetEstimatorCache = new HashMap<TopicPartition, KafkaLatestOffsetEstimator>();
        if (this.checkStopReadingFn != null) {
            this.checkStopReadingFn.setup();
        }
    }

    @DoFn.Teardown
    public void teardown() throws @UnknownKeyFor @NonNull @Initialized Exception {
        Deserializer keyDeserializerInstance = (Deserializer)Preconditions.checkStateNotNull(this.keyDeserializerInstance);
        Deserializer valueDeserializerInstance = (Deserializer)Preconditions.checkStateNotNull(this.valueDeserializerInstance);
        try {
            Closeables.close((Closeable)keyDeserializerInstance, (boolean)true);
            Closeables.close((Closeable)valueDeserializerInstance, (boolean)true);
        }
        catch (Exception anyException) {
            LOG.warn("Fail to close resource during finishing bundle.", (Throwable)anyException);
        }
        if (this.offsetEstimatorCache != null) {
            this.offsetEstimatorCache.clear();
        }
        if (this.checkStopReadingFn != null) {
            this.checkStopReadingFn.teardown();
        }
    }

    private @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> overrideBootstrapServersConfig(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> currentConfig, @UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor description) {
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState((currentConfig.containsKey("bootstrap.servers") || description.getBootStrapServers() != null ? 1 : 0) != 0);
        HashMap<String, Object> config = new HashMap<String, Object>(currentConfig);
        if (description.getBootStrapServers() != null && description.getBootStrapServers().size() > 0) {
            config.put("bootstrap.servers", String.join((CharSequence)",", description.getBootStrapServers()));
        }
        return config;
    }

    private static @UnknownKeyFor @NonNull @Initialized Instant ensureTimestampWithinBounds(@UnknownKeyFor @NonNull @Initialized Instant timestamp) {
        if (timestamp.isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MIN_VALUE)) {
            timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
        } else if (timestamp.isAfter((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
        }
        return timestamp;
    }

    private static class AverageRecordSize {
        private @UnknownKeyFor @NonNull @Initialized KafkaIOUtils.MovingAvg avgRecordSize = new KafkaIOUtils.MovingAvg();
        private @UnknownKeyFor @NonNull @Initialized KafkaIOUtils.MovingAvg avgRecordGap = new KafkaIOUtils.MovingAvg();

        public void update(@UnknownKeyFor @NonNull @Initialized int recordSize, @UnknownKeyFor @NonNull @Initialized long gap) {
            this.avgRecordSize.update(recordSize);
            this.avgRecordGap.update(gap);
        }

        public @UnknownKeyFor @NonNull @Initialized double getTotalSize(@UnknownKeyFor @NonNull @Initialized double numRecords) {
            return this.avgRecordSize.get() * numRecords / (1.0 + this.avgRecordGap.get());
        }
    }

    private static class KafkaLatestOffsetEstimator
    implements GrowableOffsetRangeTracker.RangeEndEstimator {
        private final @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> offsetConsumer;
        private final @UnknownKeyFor @NonNull @Initialized TopicPartition topicPartition;
        private final @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized Long> memoizedBacklog;
        private @UnknownKeyFor @NonNull @Initialized boolean closed;

        KafkaLatestOffsetEstimator(@UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> offsetConsumer, @UnknownKeyFor @NonNull @Initialized TopicPartition topicPartition) {
            this.offsetConsumer = offsetConsumer;
            this.topicPartition = topicPartition;
            ConsumerSpEL.evaluateAssign(this.offsetConsumer, (Collection<TopicPartition>)ImmutableList.of((Object)this.topicPartition));
            this.memoizedBacklog = Suppliers.memoizeWithExpiration(() -> {
                Consumer consumer = offsetConsumer;
                synchronized (consumer) {
                    ConsumerSpEL.evaluateSeek2End(offsetConsumer, topicPartition);
                    return offsetConsumer.position(topicPartition);
                }
            }, (long)1L, (TimeUnit)TimeUnit.SECONDS);
        }

        protected void finalize() {
            try {
                Closeables.close(this.offsetConsumer, (boolean)true);
                this.closed = true;
                LOG.info("Offset Estimator consumer was closed for {}", (Object)this.topicPartition);
            }
            catch (Exception anyException) {
                LOG.warn("Failed to close offset consumer for {}", (Object)this.topicPartition);
            }
        }

        public @UnknownKeyFor @NonNull @Initialized long estimate() {
            return (Long)this.memoizedBacklog.get();
        }

        public @UnknownKeyFor @NonNull @Initialized boolean isClosed() {
            return this.closed;
        }
    }

    @DoFn.BoundedPerElement
    private static class Bounded<@UnknownKeyFor K, @UnknownKeyFor V>
    extends ReadFromKafkaDoFn<K, V> {
        Bounded(@UnknownKeyFor @NonNull @Initialized KafkaIO.ReadSourceDescriptors<K, V> transform, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> recordTag) {
            super(transform, recordTag);
        }
    }

    @DoFn.UnboundedPerElement
    private static class Unbounded<@UnknownKeyFor K, @UnknownKeyFor V>
    extends ReadFromKafkaDoFn<K, V> {
        Unbounded(@UnknownKeyFor @NonNull @Initialized KafkaIO.ReadSourceDescriptors<K, V> transform, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> recordTag) {
            super(transform, recordTag);
        }
    }
}

