/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.io.Closeable;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.MathContext;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.kafka.CheckStopReadingFn;
import org.apache.beam.sdk.io.kafka.ConsumerSpEL;
import org.apache.beam.sdk.io.kafka.DeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;
import org.apache.beam.sdk.io.kafka.KafkaMetrics;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics;
import org.apache.beam.sdk.io.kafka.KafkaSourceDescriptor;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.MemoizingPerInstantiationSerializableSupplier;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalCause;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.EnsuresNonNull;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.RequiresNonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class ReadFromKafkaDoFn<@UnknownKeyFor K, @UnknownKeyFor V>
extends DoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class);
    private static final @UnknownKeyFor @NonNull @Initialized Joiner COMMA_JOINER = Joiner.on((char)',');
    private final @Nullable @UnknownKeyFor @Initialized CheckStopReadingFn checkStopReadingFn;
    private final @Nullable @UnknownKeyFor @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> extractOutputTimestampFn;
    private final @Nullable @UnknownKeyFor @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Instant, @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant>> createWatermarkEstimatorFn;
    private final @Nullable @UnknownKeyFor @Initialized TimestampPolicyFactory<K, V> timestampPolicyFactory;
    private final @UnknownKeyFor @NonNull @Initialized BadRecordRouter badRecordRouter;
    private final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> recordTag;
    private final @UnknownKeyFor @NonNull @Initialized SerializableSupplier<@UnknownKeyFor @NonNull @Initialized LoadingCache<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaIOUtils.MovingAvg>> avgRecordSizeCacheSupplier;
    private final @UnknownKeyFor @NonNull @Initialized SerializableSupplier<@UnknownKeyFor @NonNull @Initialized LoadingCache<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaLatestOffsetEstimator>> latestOffsetEstimatorCacheSupplier;
    private final @UnknownKeyFor @NonNull @Initialized SerializableSupplier<@UnknownKeyFor @NonNull @Initialized LoadingCache<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>>> pollConsumerCacheSupplier;
    private transient @MonotonicNonNull @UnknownKeyFor @Initialized LoadingCache<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaIOUtils.MovingAvg> avgRecordSizeCache;
    private transient @MonotonicNonNull @UnknownKeyFor @Initialized LoadingCache<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaLatestOffsetEstimator> latestOffsetEstimatorCache;
    private transient @MonotonicNonNull @UnknownKeyFor @Initialized LoadingCache<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>> pollConsumerCache;
    private transient @Nullable @UnknownKeyFor @Initialized Deserializer<K> keyDeserializerInstance = null;
    private transient @Nullable @UnknownKeyFor @Initialized Deserializer<V> valueDeserializerInstance = null;
    private static final @UnknownKeyFor @NonNull @Initialized long DEFAULT_KAFKA_POLL_TIMEOUT = 2L;
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized Duration consumerPollingTimeout;
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized DeserializerProvider<K> keyDeserializerProvider;
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized DeserializerProvider<V> valueDeserializerProvider;
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> consumerConfig;
    @VisibleForTesting
    static final @UnknownKeyFor @NonNull @Initialized String METRIC_NAMESPACE = "KafkaIOReader";
    @VisibleForTesting
    static final @UnknownKeyFor @NonNull @Initialized String RAW_SIZE_METRIC_PREFIX = "rawSize/";

    static <K, V> @UnknownKeyFor @NonNull @Initialized ReadFromKafkaDoFn<K, V> create( @UnknownKeyFor @NonNull @Initialized KafkaIO.ReadSourceDescriptors<K, V> transform, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> recordTag) {
        if (transform.isBounded()) {
            return new Bounded<K, V>(transform, recordTag);
        }
        return new Unbounded<K, V>(transform, recordTag);
    }

    private ReadFromKafkaDoFn( @UnknownKeyFor @NonNull @Initialized KafkaIO.ReadSourceDescriptors<K, V> transform, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> recordTag) {
        final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn = transform.getConsumerFactoryFn();
        this.consumerConfig = transform.getConsumerConfig();
        this.keyDeserializerProvider = (DeserializerProvider)Preconditions.checkArgumentNotNull(transform.getKeyDeserializerProvider());
        this.valueDeserializerProvider = (DeserializerProvider)Preconditions.checkArgumentNotNull(transform.getValueDeserializerProvider());
        this.extractOutputTimestampFn = transform.getExtractOutputTimestampFn();
        this.createWatermarkEstimatorFn = transform.getCreateWatermarkEstimatorFn();
        this.timestampPolicyFactory = transform.getTimestampPolicyFactory();
        this.checkStopReadingFn = transform.getCheckStopReadingFn();
        this.badRecordRouter = transform.getBadRecordRouter();
        this.recordTag = recordTag;
        this.avgRecordSizeCacheSupplier = new MemoizingPerInstantiationSerializableSupplier((SerializableSupplier & Serializable)() -> CacheBuilder.newBuilder().concurrencyLevel(Runtime.getRuntime().availableProcessors()).weakValues().build((CacheLoader)new CacheLoader<KafkaSourceDescriptor, KafkaIOUtils.MovingAvg>(){

            public @UnknownKeyFor @NonNull @Initialized KafkaIOUtils.MovingAvg load(@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor kafkaSourceDescriptor) throws @UnknownKeyFor @NonNull @Initialized Exception {
                return new KafkaIOUtils.MovingAvg();
            }
        }));
        this.latestOffsetEstimatorCacheSupplier = new MemoizingPerInstantiationSerializableSupplier((SerializableSupplier & Serializable)() -> CacheBuilder.newBuilder().concurrencyLevel(Runtime.getRuntime().availableProcessors()).weakValues().removalListener(notification -> {
            KafkaLatestOffsetEstimator value;
            if (notification.getCause() == RemovalCause.COLLECTED && (value = (KafkaLatestOffsetEstimator)notification.getValue()) != null) {
                value.close();
            }
        }).build((CacheLoader)new CacheLoader<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>(){

            public @UnknownKeyFor @NonNull @Initialized KafkaLatestOffsetEstimator load(@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor sourceDescriptor) {
                LOG.info("Creating Kafka consumer for offset estimation for {}", (Object)sourceDescriptor);
                Map<String, Object> config = KafkaIOUtils.overrideBootstrapServersConfig(ReadFromKafkaDoFn.this.consumerConfig, sourceDescriptor);
                Consumer consumer = (Consumer)consumerFactoryFn.apply(config);
                return new KafkaLatestOffsetEstimator((Consumer<byte[], byte[]>)consumer, sourceDescriptor.getTopicPartition());
            }
        }));
        this.pollConsumerCacheSupplier = new MemoizingPerInstantiationSerializableSupplier((SerializableSupplier & Serializable)() -> CacheBuilder.newBuilder().concurrencyLevel(Runtime.getRuntime().availableProcessors()).weakValues().removalListener(notification -> {
            Consumer value;
            if (notification.getCause() == RemovalCause.COLLECTED && (value = (Consumer)notification.getValue()) != null) {
                value.close();
            }
        }).build((CacheLoader)new CacheLoader<KafkaSourceDescriptor, Consumer<byte[], byte[]>>(){

            public @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> load(@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor sourceDescriptor) {
                LOG.info("Creating Kafka consumer for restriction processing for {}", (Object)sourceDescriptor);
                Map<String, Object> config = KafkaIOUtils.overrideBootstrapServersConfig(ReadFromKafkaDoFn.this.consumerConfig, sourceDescriptor);
                Consumer consumer = (Consumer)consumerFactoryFn.apply(config);
                consumer.assign(Collections.singleton(sourceDescriptor.getTopicPartition()));
                return consumer;
            }
        }));
        this.consumerPollingTimeout = Duration.ofSeconds(transform.getConsumerPollingTimeout() > 0L ? transform.getConsumerPollingTimeout() : 2L);
    }

    @DoFn.GetInitialRestriction
    @RequiresNonNull(value={"pollConsumerCache"})
    public @UnknownKeyFor @NonNull @Initialized OffsetRange initialRestriction(@DoFn.Element @UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor kafkaSourceDescriptor) {
        LoadingCache<KafkaSourceDescriptor, Consumer<byte[], byte[]>> pollConsumerCache = this.pollConsumerCache;
        Consumer consumer = (Consumer)pollConsumerCache.getUnchecked((Object)kafkaSourceDescriptor);
        @Nullable Long startReadOffset = kafkaSourceDescriptor.getStartReadOffset();
        @Nullable Instant startReadTime = kafkaSourceDescriptor.getStartReadTime();
        long startOffset = startReadOffset != null ? startReadOffset : (startReadTime != null ? ((OffsetAndTimestamp)Preconditions.checkStateNotNull((Object)((OffsetAndTimestamp)consumer.offsetsForTimes(Collections.singletonMap(kafkaSourceDescriptor.getTopicPartition(), startReadTime.getMillis())).get(kafkaSourceDescriptor.getTopicPartition())))).offset() : consumer.position(kafkaSourceDescriptor.getTopicPartition()));
        @Nullable Long stopReadOffset = kafkaSourceDescriptor.getStopReadOffset();
        @Nullable Instant stopReadTime = kafkaSourceDescriptor.getStopReadTime();
        long stopOffset = stopReadOffset != null ? stopReadOffset : (stopReadTime != null ? ((OffsetAndTimestamp)Preconditions.checkStateNotNull((Object)((OffsetAndTimestamp)consumer.offsetsForTimes(Collections.singletonMap(kafkaSourceDescriptor.getTopicPartition(), stopReadTime.getMillis())).get(kafkaSourceDescriptor.getTopicPartition())))).offset() : Long.MAX_VALUE);
        OffsetRange initialRestriction = new OffsetRange(startOffset, stopOffset);
        Lineage.getSources().add("kafka", (Iterable)ImmutableList.of((Object)Optional.ofNullable(KafkaIOUtils.overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor).get("bootstrap.servers")).map(value -> (List)ConfigDef.parseType((String)"bootstrap.servers", (Object)value, (ConfigDef.Type)ConfigDef.Type.LIST)).map(ImmutableSet::copyOf).map(arg_0 -> ((Joiner)COMMA_JOINER).join(arg_0)).get(), (Object)((String)MoreObjects.firstNonNull((Object)kafkaSourceDescriptor.getTopic(), (Object)kafkaSourceDescriptor.getTopicPartition().topic()))));
        return initialRestriction;
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public @UnknownKeyFor @NonNull @Initialized Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp @UnknownKeyFor @NonNull @Initialized Instant currentElementTimestamp) {
        return currentElementTimestamp;
    }

    @DoFn.NewWatermarkEstimator
    public @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState @UnknownKeyFor @NonNull @Initialized Instant watermarkEstimatorState) {
        SerializableFunction createWatermarkEstimatorFn = (SerializableFunction)Preconditions.checkStateNotNull(this.createWatermarkEstimatorFn);
        return (WatermarkEstimator)createWatermarkEstimatorFn.apply((Object)ReadFromKafkaDoFn.ensureTimestampWithinBounds(watermarkEstimatorState));
    }

    @DoFn.GetSize
    @RequiresNonNull(value={"avgRecordSizeCache", "latestOffsetEstimatorCache"})
    public @UnknownKeyFor @NonNull @Initialized double getSize(@DoFn.Element @UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor kafkaSourceDescriptor, @DoFn.Restriction @UnknownKeyFor @NonNull @Initialized OffsetRange offsetRange) {
        LoadingCache<KafkaSourceDescriptor, KafkaIOUtils.MovingAvg> avgRecordSizeCache = this.avgRecordSizeCache;
        @Nullable KafkaIOUtils.MovingAvg avgRecordSize = (KafkaIOUtils.MovingAvg)avgRecordSizeCache.getIfPresent((Object)kafkaSourceDescriptor);
        double estimatedOffsetRange = this.restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining();
        return avgRecordSize == null ? estimatedOffsetRange : estimatedOffsetRange * avgRecordSize.get();
    }

    @DoFn.NewTracker
    @RequiresNonNull(value={"latestOffsetEstimatorCache"})
    public @UnknownKeyFor @NonNull @Initialized OffsetRangeTracker restrictionTracker(@DoFn.Element @UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor kafkaSourceDescriptor, @DoFn.Restriction @UnknownKeyFor @NonNull @Initialized OffsetRange restriction) {
        LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator> latestOffsetEstimatorCache = this.latestOffsetEstimatorCache;
        if (restriction.getTo() < Long.MAX_VALUE) {
            return new OffsetRangeTracker(restriction);
        }
        return new GrowableOffsetRangeTracker(restriction.getFrom(), (GrowableOffsetRangeTracker.RangeEndEstimator)latestOffsetEstimatorCache.getUnchecked((Object)kafkaSourceDescriptor));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @DoFn.ProcessElement
    @RequiresNonNull(value={"avgRecordSizeCache", "latestOffsetEstimatorCache", "pollConsumerCache"})
    public // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized DoFn.ProcessContinuation processElement(@DoFn.Element @UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor kafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized Long> tracker, @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> watermarkEstimator, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized DoFn.MultiOutputReceiver receiver) throws @UnknownKeyFor @NonNull @Initialized Exception {
        LoadingCache<KafkaSourceDescriptor, KafkaIOUtils.MovingAvg> avgRecordSizeCache = this.avgRecordSizeCache;
        LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator> latestOffsetEstimatorCache = this.latestOffsetEstimatorCache;
        LoadingCache<KafkaSourceDescriptor, Consumer<byte[], byte[]>> pollConsumerCache = this.pollConsumerCache;
        KafkaIOUtils.MovingAvg avgRecordSize = (KafkaIOUtils.MovingAvg)avgRecordSizeCache.get((Object)kafkaSourceDescriptor);
        KafkaLatestOffsetEstimator latestOffsetEstimator = (KafkaLatestOffsetEstimator)latestOffsetEstimatorCache.get((Object)kafkaSourceDescriptor);
        Consumer consumer = (Consumer)pollConsumerCache.get((Object)kafkaSourceDescriptor);
        Deserializer keyDeserializerInstance = (Deserializer)Preconditions.checkStateNotNull(this.keyDeserializerInstance);
        Deserializer valueDeserializerInstance = (Deserializer)Preconditions.checkStateNotNull(this.valueDeserializerInstance);
        TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
        Distribution rawSizes = Metrics.distribution((String)METRIC_NAMESPACE, (String)(RAW_SIZE_METRIC_PREFIX + topicPartition.toString()));
        Gauge backlogBytes = Metrics.gauge((String)METRIC_NAMESPACE, (String)("rawSize/backlogBytes_" + topicPartition.toString()));
        if (this.checkStopReadingFn != null && ((Boolean)this.checkStopReadingFn.apply(topicPartition)).booleanValue()) {
            tracker.tryClaim((Object)(((OffsetRange)tracker.currentRestriction()).getTo() - 1L));
            return DoFn.ProcessContinuation.stop();
        }
        TimestampPolicy timestampPolicy = null;
        if (this.timestampPolicyFactory != null) {
            timestampPolicy = this.timestampPolicyFactory.createTimestampPolicy(topicPartition, Optional.ofNullable(watermarkEstimator.currentWatermark()));
        }
        Duration remainingTimeout = this.consumerPollingTimeout;
        long expectedOffset = ((OffsetRange)tracker.currentRestriction()).getFrom();
        consumer.resume(Collections.singleton(topicPartition));
        consumer.seek(topicPartition, expectedOffset);
        Stopwatch pollTimer = Stopwatch.createUnstarted();
        KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
        try {
            while (Duration.ZERO.compareTo(remainingTimeout) < 0) {
                Iterator iterator;
                pollTimer.reset().start();
                ConsumerRecords rawRecords = consumer.poll(remainingTimeout);
                Duration elapsed = pollTimer.elapsed();
                try {
                    remainingTimeout = remainingTimeout.minus(elapsed);
                }
                catch (ArithmeticException e) {
                    remainingTimeout = Duration.ZERO;
                }
                kafkaMetrics.updateSuccessfulRpcMetrics(topicPartition.topic(), elapsed);
                if (rawRecords == ConsumerRecords.empty()) {
                    consumer.pause(Collections.singleton(topicPartition));
                    if (!this.topicPartitionExists(kafkaSourceDescriptor.getTopicPartition(), consumer.partitionsFor(kafkaSourceDescriptor.getTopic()))) {
                        DoFn.ProcessContinuation e = DoFn.ProcessContinuation.stop();
                        return e;
                    }
                    if (timestampPolicy != null) {
                        this.updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
                    }
                    DoFn.ProcessContinuation e = DoFn.ProcessContinuation.resume();
                    return e;
                }
                long rawSizesSum = 0L;
                long rawSizesCount = 0L;
                long rawSizesMin = Long.MAX_VALUE;
                long rawSizesMax = Long.MIN_VALUE;
                try {
                    iterator = rawRecords.iterator();
                    while (true) {
                        if (iterator.hasNext()) {
                            ConsumerRecord rawRecord = (ConsumerRecord)iterator.next();
                            if (!tracker.tryClaim((Object)rawRecord.offset())) {
                                consumer.seek(topicPartition, rawRecord.offset());
                                consumer.pause(Collections.singleton(topicPartition));
                                DoFn.ProcessContinuation processContinuation = DoFn.ProcessContinuation.stop();
                                return processContinuation;
                            }
                            expectedOffset = rawRecord.offset() + 1L;
                            try {
                                Instant outputTimestamp;
                                KafkaRecord kafkaRecord = new KafkaRecord(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), ConsumerSpEL.getRecordTimestamp((ConsumerRecord<byte[], byte[]>)rawRecord), ConsumerSpEL.getRecordTimestampType((ConsumerRecord<byte[], byte[]>)rawRecord), ConsumerSpEL.hasHeaders() ? rawRecord.headers() : null, ConsumerSpEL.deserializeKey(keyDeserializerInstance, (ConsumerRecord<byte[], byte[]>)rawRecord), ConsumerSpEL.deserializeValue(valueDeserializerInstance, (ConsumerRecord<byte[], byte[]>)rawRecord));
                                int recordSize = (rawRecord.key() == null ? 0 : ((byte[])rawRecord.key()).length) + (rawRecord.value() == null ? 0 : ((byte[])rawRecord.value()).length);
                                rawSizesSum += (long)recordSize;
                                ++rawSizesCount;
                                rawSizesMin = Math.min(rawSizesMin, (long)recordSize);
                                rawSizesMax = Math.max(rawSizesMax, (long)recordSize);
                                if (timestampPolicy != null) {
                                    KafkaUnboundedReader.TimestampPolicyContext context = this.updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
                                    outputTimestamp = timestampPolicy.getTimestampForRecord(context, kafkaRecord);
                                } else {
                                    Preconditions.checkStateNotNull(this.extractOutputTimestampFn);
                                    outputTimestamp = (Instant)this.extractOutputTimestampFn.apply(kafkaRecord);
                                }
                                receiver.get(this.recordTag).outputWithTimestamp((Object)KV.of((Object)kafkaSourceDescriptor, kafkaRecord), outputTimestamp);
                            }
                            catch (SerializationException e) {
                                this.badRecordRouter.route(receiver, (Object)rawRecord, null, (Exception)((Object)e), "Failure deserializing Key or Value of Kakfa record reading from Kafka");
                                if (timestampPolicy == null) continue;
                                this.updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
                            }
                            continue;
                        }
                        break;
                    }
                }
                finally {
                    if (rawSizesCount > 0L) {
                        avgRecordSize.update(rawSizesSum, rawSizesCount);
                        rawSizes.update(rawSizesSum, rawSizesCount, rawSizesMin, rawSizesMax);
                    }
                }
                long l = expectedOffset;
                expectedOffset = consumer.position(topicPartition);
                if (l < expectedOffset) {
                    if (!tracker.tryClaim((Object)(expectedOffset - 1L))) {
                        consumer.seek(topicPartition, expectedOffset - 1L);
                        consumer.pause(Collections.singleton(topicPartition));
                        iterator = DoFn.ProcessContinuation.stop();
                        return iterator;
                    }
                    if (timestampPolicy != null) {
                        this.updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
                    }
                }
                long estimatedBacklogBytes = (long)(BigDecimal.valueOf(latestOffsetEstimator.estimate()).subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128).doubleValue() * avgRecordSize.get());
                backlogBytes.set(estimatedBacklogBytes);
                kafkaMetrics.updateBacklogBytes(kafkaSourceDescriptor.getTopic(), kafkaSourceDescriptor.getPartition(), estimatedBacklogBytes);
            }
            if (timestampPolicy != null) {
                this.updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
            }
            DoFn.ProcessContinuation processContinuation = DoFn.ProcessContinuation.resume();
            return processContinuation;
        }
        finally {
            kafkaMetrics.flushBufferedMetrics();
        }
    }

    private @UnknownKeyFor @NonNull @Initialized boolean topicPartitionExists(@UnknownKeyFor @NonNull @Initialized TopicPartition topicPartition, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized PartitionInfo> partitionInfos) {
        return partitionInfos.stream().anyMatch(partitionInfo -> partitionInfo.partition() == topicPartition.partition());
    }

    private @UnknownKeyFor @NonNull @Initialized KafkaUnboundedReader.TimestampPolicyContext updateWatermarkManually(@UnknownKeyFor @NonNull @Initialized TimestampPolicy<K, V> timestampPolicy, @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> watermarkEstimator, @UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized Long> tracker) {
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState((boolean)(watermarkEstimator instanceof ManualWatermarkEstimator));
        KafkaUnboundedReader.TimestampPolicyContext context = new KafkaUnboundedReader.TimestampPolicyContext((long)((RestrictionTracker.HasProgress)tracker).getProgress().getWorkRemaining(), Instant.now());
        ((ManualWatermarkEstimator)watermarkEstimator).setWatermark(ReadFromKafkaDoFn.ensureTimestampWithinBounds(timestampPolicy.getWatermark(context)));
        return context;
    }

    @DoFn.GetRestrictionCoder
    public @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized OffsetRange> restrictionCoder() {
        return new OffsetRange.Coder();
    }

    @DoFn.Setup
    @EnsuresNonNull(value={"avgRecordSizeCache", "latestOffsetEstimatorCache", "pollConsumerCache"})
    public void setup() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.avgRecordSizeCache = (LoadingCache)this.avgRecordSizeCacheSupplier.get();
        this.latestOffsetEstimatorCache = (LoadingCache)this.latestOffsetEstimatorCacheSupplier.get();
        this.pollConsumerCache = (LoadingCache)this.pollConsumerCacheSupplier.get();
        this.keyDeserializerInstance = this.keyDeserializerProvider.getDeserializer(this.consumerConfig, true);
        this.valueDeserializerInstance = this.valueDeserializerProvider.getDeserializer(this.consumerConfig, false);
        if (this.checkStopReadingFn != null) {
            this.checkStopReadingFn.setup();
        }
    }

    @DoFn.Teardown
    @RequiresNonNull(value={"avgRecordSizeCache", "latestOffsetEstimatorCache", "pollConsumerCache"})
    public void teardown() throws @UnknownKeyFor @NonNull @Initialized Exception {
        LoadingCache<KafkaSourceDescriptor, KafkaIOUtils.MovingAvg> avgRecordSizeCache = this.avgRecordSizeCache;
        LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator> latestOffsetEstimatorCache = this.latestOffsetEstimatorCache;
        LoadingCache<KafkaSourceDescriptor, Consumer<byte[], byte[]>> pollConsumerCache = this.pollConsumerCache;
        try {
            if (this.valueDeserializerInstance != null) {
                Closeables.close(this.valueDeserializerInstance, (boolean)true);
                this.valueDeserializerInstance = null;
            }
            if (this.keyDeserializerInstance != null) {
                Closeables.close(this.keyDeserializerInstance, (boolean)true);
                this.keyDeserializerInstance = null;
            }
        }
        catch (Exception anyException) {
            LOG.warn("Fail to close resource during finishing bundle.", (Throwable)anyException);
        }
        if (this.checkStopReadingFn != null) {
            this.checkStopReadingFn.teardown();
        }
        avgRecordSizeCache.cleanUp();
        latestOffsetEstimatorCache.cleanUp();
        pollConsumerCache.cleanUp();
    }

    private static @UnknownKeyFor @NonNull @Initialized Instant ensureTimestampWithinBounds(@UnknownKeyFor @NonNull @Initialized Instant timestamp) {
        if (timestamp.isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MIN_VALUE)) {
            timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
        } else if (timestamp.isAfter((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
        }
        return timestamp;
    }

    private static class KafkaLatestOffsetEstimator
    implements GrowableOffsetRangeTracker.RangeEndEstimator,
    Closeable {
        private static final @UnknownKeyFor @NonNull @Initialized AtomicReferenceFieldUpdater<@UnknownKeyFor @NonNull @Initialized KafkaLatestOffsetEstimator, @Nullable @UnknownKeyFor @Initialized Runnable> CURRENT_REFRESH_TASK = AtomicReferenceFieldUpdater.newUpdater(KafkaLatestOffsetEstimator.class, Runnable.class, "currentRefreshTask");
        private final @UnknownKeyFor @NonNull @Initialized Executor executor = Executors.newSingleThreadExecutor();
        private final @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> offsetConsumer;
        private final @UnknownKeyFor @NonNull @Initialized TopicPartition topicPartition;
        private @UnknownKeyFor @NonNull @Initialized long lastRefreshEndOffset;
        private @UnknownKeyFor @NonNull @Initialized long nextRefreshNanos;
        private volatile @Nullable @UnknownKeyFor @Initialized Runnable currentRefreshTask;

        KafkaLatestOffsetEstimator(@UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> offsetConsumer, @UnknownKeyFor @NonNull @Initialized TopicPartition topicPartition) {
            this.offsetConsumer = offsetConsumer;
            this.topicPartition = topicPartition;
            this.lastRefreshEndOffset = -1L;
            this.nextRefreshNanos = Long.MIN_VALUE;
            this.currentRefreshTask = null;
        }

        public @UnknownKeyFor @NonNull @Initialized long estimate() {
            long currentNanos;
            @Nullable Runnable task = this.currentRefreshTask;
            if (task == null && this.nextRefreshNanos < (currentNanos = System.nanoTime()) && CURRENT_REFRESH_TASK.compareAndSet(this, null, this::refresh)) {
                try {
                    this.executor.execute(this::refresh);
                }
                catch (RejectedExecutionException ex) {
                    LOG.error("Execution of end offset refresh rejected for {}", (Object)this.topicPartition, (Object)ex);
                    this.nextRefreshNanos = currentNanos + TimeUnit.SECONDS.toNanos(1L);
                    CURRENT_REFRESH_TASK.lazySet(this, null);
                }
            }
            return this.lastRefreshEndOffset;
        }

        @Override
        public void close() {
            this.offsetConsumer.close();
        }

        private void refresh() {
            try {
                @Nullable Long endOffset = (Long)this.offsetConsumer.endOffsets(Collections.singleton(this.topicPartition)).get(this.topicPartition);
                if (endOffset == null) {
                    LOG.warn("No end offset found for partition {}.", (Object)this.topicPartition);
                } else {
                    this.lastRefreshEndOffset = endOffset;
                }
                this.nextRefreshNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(1L);
            }
            finally {
                CURRENT_REFRESH_TASK.lazySet(this, null);
            }
        }
    }

    @DoFn.BoundedPerElement
    private static class Bounded<@UnknownKeyFor K, @UnknownKeyFor V>
    extends ReadFromKafkaDoFn<K, V> {
        Bounded( @UnknownKeyFor @NonNull @Initialized KafkaIO.ReadSourceDescriptors<K, V> transform, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> recordTag) {
            super(transform, recordTag);
        }
    }

    @DoFn.UnboundedPerElement
    private static class Unbounded<@UnknownKeyFor K, @UnknownKeyFor V>
    extends ReadFromKafkaDoFn<K, V> {
        Unbounded( @UnknownKeyFor @NonNull @Initialized KafkaIO.ReadSourceDescriptors<K, V> transform, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> recordTag) {
            super(transform, recordTag);
        }
    }
}

