/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_ReadSourceDescriptors;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_WriteRecords;
import org.apache.beam.sdk.io.kafka.CheckStopReadingFn;
import org.apache.beam.sdk.io.kafka.CheckStopReadingFnWrapper;
import org.apache.beam.sdk.io.kafka.ConsumerSpEL;
import org.apache.beam.sdk.io.kafka.DeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark;
import org.apache.beam.sdk.io.kafka.KafkaCommitOffset;
import org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink;
import org.apache.beam.sdk.io.kafka.KafkaIOReadImplementationCompatibility;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;
import org.apache.beam.sdk.io.kafka.KafkaPublishTimestampFunction;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaRecordCoder;
import org.apache.beam.sdk.io.kafka.KafkaSourceDescriptor;
import org.apache.beam.sdk.io.kafka.KafkaTimestampType;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaWriter;
import org.apache.beam.sdk.io.kafka.LocalDeserializerProvider;
import org.apache.beam.sdk.io.kafka.ProducerRecordCoder;
import org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sdk.io.kafka.WatchForKafkaTopicPartitions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Redistribute;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.construction.PTransformMatchers;
import org.apache.beam.sdk.util.construction.ReplacementOutputs;
import org.apache.beam.sdk.util.construction.TransformUpgrader;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.Pure;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaIO {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(KafkaIO.class);

    public static @UnknownKeyFor @NonNull @Initialized Read<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> readBytes() {
        return KafkaIO.read().withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(ByteArrayDeserializer.class);
    }

    public static <K, V> @UnknownKeyFor @NonNull @Initialized Read<K, V> read() {
        return new AutoValue_KafkaIO_Read.Builder().setTopics(new ArrayList<String>()).setTopicPartitions(new ArrayList<TopicPartition>()).setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN).setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES).setMaxNumRecords(Long.MAX_VALUE).setCommitOffsetsInFinalizeEnabled(false).setDynamicRead(false).setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()).setConsumerPollingTimeout(2L).setRedistributed(false).setAllowDuplicates(false).setRedistributeNumKeys(0).build();
    }

    public static <K, V> @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> readSourceDescriptors() {
        return ReadSourceDescriptors.read();
    }

    public static <K, V> @UnknownKeyFor @NonNull @Initialized Write<K, V> write() {
        return new AutoValue_KafkaIO_Write.Builder<K, V>().setWriteRecordsTransform(KafkaIO.writeRecords()).build();
    }

    public static <K, V> @UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> writeRecords() {
        return new AutoValue_KafkaIO_WriteRecords.Builder().setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES).setEOS(false).setNumShards(0).setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN).setBadRecordRouter(BadRecordRouter.THROWING_ROUTER).setBadRecordErrorHandler((ErrorHandler<BadRecord, ?>)new ErrorHandler.DefaultErrorHandler()).build();
    }

    private KafkaIO() {
    }

    private static /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> resolveClass(@UnknownKeyFor @NonNull @Initialized String className) {
        try {
            return Class.forName(className);
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException("Could not find class: " + className);
        }
    }

    private static class NullOnlyCoder<@Nullable T>
    extends AtomicCoder<T> {
        private NullOnlyCoder() {
        }

        public void encode(T value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) {
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((value == null ? 1 : 0) != 0, (Object)"Can only encode nulls");
        }

        public T decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) {
            return null;
        }
    }

    private static class KafkaValueWrite<@UnknownKeyFor V>
    extends PTransform<PCollection<V>, PDone> {
        private final /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Write<@Nullable @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @Initialized ?, V> kvWriteTransform;

        private KafkaValueWrite(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Write<@Nullable @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @Initialized ?, V> kvWriteTransform) {
            this.kvWriteTransform = kvWriteTransform;
        }

        public @UnknownKeyFor @NonNull @Initialized PDone expand(@UnknownKeyFor @NonNull @Initialized PCollection<V> input) {
            return (PDone)((PCollection)input.apply("Kafka values with default key", (PTransform)MapElements.via((SimpleFunction)new SimpleFunction<V, KV<Object, V>>(){

                public @UnknownKeyFor @NonNull @Initialized KV<@Nullable @UnknownKeyFor @Initialized Object, V> apply(V element) {
                    return KV.of(null, element);
                }
            }))).setCoder((Coder)KvCoder.of(new NullOnlyCoder(), (Coder)input.getCoder())).apply(this.kvWriteTransform);
        }

        public void populateDisplayData(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.kvWriteTransform.populateDisplayData(builder);
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    public static abstract class Write<@UnknownKeyFor K, @UnknownKeyFor V>
    extends PTransform<PCollection<KV<K, V>>, PDone> {
        public static final @UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized AutoValue_KafkaIO_Write> AUTOVALUE_CLASS = AutoValue_KafkaIO_Write.class;

        abstract @Nullable @UnknownKeyFor @Initialized String getTopic();

        public abstract @UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> getWriteRecordsTransform();

        abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> toBuilder();

        private @UnknownKeyFor @NonNull @Initialized Write<K, V> withWriteRecordsTransform(@UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> transform) {
            return this.toBuilder().setWriteRecordsTransform(transform).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Write<K, V> withBootstrapServers(@UnknownKeyFor @NonNull @Initialized String bootstrapServers) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withBootstrapServers(bootstrapServers));
        }

        public @UnknownKeyFor @NonNull @Initialized Write<K, V> withTopic(@UnknownKeyFor @NonNull @Initialized String topic) {
            return this.toBuilder().setTopic(topic).setWriteRecordsTransform(this.getWriteRecordsTransform().withTopic(topic)).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Write<K, V> withKeySerializer(@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Serializer<K>> keySerializer) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withKeySerializer(keySerializer));
        }

        public @UnknownKeyFor @NonNull @Initialized Write<K, V> withValueSerializer(@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Serializer<V>> valueSerializer) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withValueSerializer(valueSerializer));
        }

        public @UnknownKeyFor @NonNull @Initialized Write<K, V> withProducerFactoryFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized Producer<K, V>> producerFactoryFn) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withProducerFactoryFn(producerFactoryFn));
        }

        public @UnknownKeyFor @NonNull @Initialized Write<K, V> withInputTimestamp() {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withInputTimestamp());
        }

        @Deprecated
        public @UnknownKeyFor @NonNull @Initialized Write<K, V> withPublishTimestampFunction(@UnknownKeyFor @NonNull @Initialized KafkaPublishTimestampFunction<@UnknownKeyFor @NonNull @Initialized KV<K, V>> timestampFunction) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withPublishTimestampFunction(new PublishTimestampFunctionKV<K, V>(timestampFunction)));
        }

        public @UnknownKeyFor @NonNull @Initialized Write<K, V> withEOS(@UnknownKeyFor @NonNull @Initialized int numShards, @UnknownKeyFor @NonNull @Initialized String sinkGroupId) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withEOS(numShards, sinkGroupId));
        }

        public @UnknownKeyFor @NonNull @Initialized Write<K, V> withConsumerFactoryFn(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> consumerFactoryFn) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withConsumerFactoryFn(consumerFactoryFn));
        }

        @Deprecated
        public @UnknownKeyFor @NonNull @Initialized Write<K, V> updateProducerProperties(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> configUpdates) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().updateProducerProperties(configUpdates));
        }

        public @UnknownKeyFor @NonNull @Initialized Write<K, V> withProducerConfigUpdates(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> configUpdates) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withProducerConfigUpdates(configUpdates));
        }

        public @UnknownKeyFor @NonNull @Initialized Write<K, V> withBadRecordErrorHandler(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized ErrorHandler<@UnknownKeyFor @NonNull @Initialized BadRecord, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> badRecordErrorHandler) {
            return this.withWriteRecordsTransform(this.getWriteRecordsTransform().withBadRecordErrorHandler(badRecordErrorHandler));
        }

        public @UnknownKeyFor @NonNull @Initialized Write<K, V> withGCPApplicationDefaultCredentials() {
            return this.withProducerConfigUpdates((Map<String, Object>)ImmutableMap.of((Object)"security.protocol", (Object)"SASL_SSL", (Object)"sasl.mechanism", (Object)"OAUTHBEARER", (Object)"sasl.login.callback.handler.class", (Object)"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler", (Object)"sasl.jaas.config", (Object)"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"));
        }

        public @UnknownKeyFor @NonNull @Initialized PDone expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, V>> input) {
            final String topic = (String)Preconditions.checkStateNotNull((Object)this.getTopic(), (Object)"withTopic() is required");
            KvCoder kvCoder = (KvCoder)input.getCoder();
            return (PDone)((PCollection)input.apply("Kafka ProducerRecord", (PTransform)MapElements.via((SimpleFunction)new SimpleFunction<KV<K, V>, ProducerRecord<K, V>>(){

                public @UnknownKeyFor @NonNull @Initialized ProducerRecord<K, V> apply(@UnknownKeyFor @NonNull @Initialized KV<K, V> element) {
                    return new ProducerRecord(topic, element.getKey(), element.getValue());
                }
            }))).setCoder(ProducerRecordCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder())).apply(this.getWriteRecordsTransform());
        }

        public void validate(@Nullable @UnknownKeyFor @Initialized PipelineOptions options) {
            this.getWriteRecordsTransform().validate(options);
        }

        public void populateDisplayData(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.getWriteRecordsTransform().populateDisplayData(builder);
        }

        public @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<V>, @UnknownKeyFor @NonNull @Initialized PDone> values() {
            Class<StringSerializer> nullKeySerializer = StringSerializer.class;
            Write<@Nullable K, V> nullKeyWriteTransform = this.withKeySerializer(nullKeySerializer);
            return new KafkaValueWrite(nullKeyWriteTransform);
        }

        private static class PublishTimestampFunctionKV<@UnknownKeyFor K, @UnknownKeyFor V>
        implements KafkaPublishTimestampFunction<ProducerRecord<K, V>> {
            private @UnknownKeyFor @NonNull @Initialized KafkaPublishTimestampFunction<@UnknownKeyFor @NonNull @Initialized KV<K, V>> fn;

            public PublishTimestampFunctionKV(@UnknownKeyFor @NonNull @Initialized KafkaPublishTimestampFunction<@UnknownKeyFor @NonNull @Initialized KV<K, V>> fn) {
                this.fn = fn;
            }

            @Override
            public @UnknownKeyFor @NonNull @Initialized Instant getTimestamp(@UnknownKeyFor @NonNull @Initialized ProducerRecord<K, V> e, @UnknownKeyFor @NonNull @Initialized Instant ts) {
                return this.fn.getTimestamp(KV.of((Object)e.key(), (Object)e.value()), ts);
            }
        }

        @AutoService(value={ExternalTransformRegistrar.class})
        public static class External
        implements ExternalTransformRegistrar {
            public static final @UnknownKeyFor @NonNull @Initialized String URN = "beam:transform:org.apache.beam:kafka_write:v1";

            public /*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized ExternalTransformBuilder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?>>> knownBuilders() {
                return ImmutableMap.of((Object)URN, AutoValue_KafkaIO_Write.Builder.class);
            }

            public static class Configuration {
                private @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> producerConfig;
                private @UnknownKeyFor @NonNull @Initialized String topic;
                private @UnknownKeyFor @NonNull @Initialized String keySerializer;
                private @UnknownKeyFor @NonNull @Initialized String valueSerializer;

                public void setProducerConfig(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> producerConfig) {
                    this.producerConfig = producerConfig;
                }

                public void setTopic(@UnknownKeyFor @NonNull @Initialized String topic) {
                    this.topic = topic;
                }

                public void setKeySerializer(@UnknownKeyFor @NonNull @Initialized String keySerializer) {
                    this.keySerializer = keySerializer;
                }

                public void setValueSerializer(@UnknownKeyFor @NonNull @Initialized String valueSerializer) {
                    this.valueSerializer = valueSerializer;
                }
            }
        }

        @AutoValue.Builder
        static abstract class Builder<@UnknownKeyFor K, @UnknownKeyFor V>
        implements ExternalTransformBuilder<External.Configuration, PCollection<KV<K, V>>, PDone> {
            Builder() {
            }

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setTopic(@UnknownKeyFor @NonNull @Initialized String var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setWriteRecordsTransform(@UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Write<K, V> build();

            public @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, V>>, @UnknownKeyFor @NonNull @Initialized PDone> buildExternal(@UnknownKeyFor @NonNull @Initialized External.Configuration configuration) {
                this.setTopic(configuration.topic);
                HashMap<String, Object> producerConfig = new HashMap<String, Object>(configuration.producerConfig);
                Class keySerializer = KafkaIO.resolveClass(configuration.keySerializer);
                Class valSerializer = KafkaIO.resolveClass(configuration.valueSerializer);
                WriteRecords writeRecords = KafkaIO.writeRecords().withProducerConfigUpdates(producerConfig).withKeySerializer(keySerializer).withValueSerializer(valSerializer).withTopic(configuration.topic);
                this.setWriteRecordsTransform(writeRecords);
                return this.build();
            }
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    public static abstract class WriteRecords<@UnknownKeyFor K, @UnknownKeyFor V>
    extends PTransform<PCollection<ProducerRecord<K, V>>, PDone> {
        private static final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> DEFAULT_PRODUCER_PROPERTIES = ImmutableMap.of((Object)"retries", (Object)3);
        private static final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of((Object)"key.serializer", (Object)"Use withKeySerializer instead", (Object)"value.serializer", (Object)"Use withValueSerializer instead");

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized String getTopic();

        @Pure
        public abstract @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> getProducerConfig();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized Producer<K, V>> getProducerFactoryFn();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Serializer<K>> getKeySerializer();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Serializer<V>> getValueSerializer();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized KafkaPublishTimestampFunction<@UnknownKeyFor @NonNull @Initialized ProducerRecord<K, V>> getPublishTimestampFunction();

        @Pure
        public abstract @UnknownKeyFor @NonNull @Initialized boolean isEOS();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized String getSinkGroupId();

        @Pure
        public abstract @UnknownKeyFor @NonNull @Initialized int getNumShards();

        @Pure
        public abstract /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @Nullable @UnknownKeyFor @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> getConsumerFactoryFn();

        @Pure
        public abstract @UnknownKeyFor @NonNull @Initialized BadRecordRouter getBadRecordRouter();

        @Pure
        public abstract /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized ErrorHandler<@UnknownKeyFor @NonNull @Initialized BadRecord, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> getBadRecordErrorHandler();

        abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> toBuilder();

        public @UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> withBootstrapServers(@UnknownKeyFor @NonNull @Initialized String bootstrapServers) {
            return this.withProducerConfigUpdates((Map<String, Object>)ImmutableMap.of((Object)"bootstrap.servers", (Object)bootstrapServers));
        }

        public @UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> withTopic(@UnknownKeyFor @NonNull @Initialized String topic) {
            return this.toBuilder().setTopic(topic).build();
        }

        public @UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> withKeySerializer(@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Serializer<K>> keySerializer) {
            return this.toBuilder().setKeySerializer(keySerializer).build();
        }

        public @UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> withValueSerializer(@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Serializer<V>> valueSerializer) {
            return this.toBuilder().setValueSerializer(valueSerializer).build();
        }

        @Deprecated
        public @UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> updateProducerProperties(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> configUpdates) {
            Map<String, Object> config = KafkaIOUtils.updateKafkaProperties(this.getProducerConfig(), configUpdates);
            return this.toBuilder().setProducerConfig(config).build();
        }

        public @UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> withProducerConfigUpdates(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> configUpdates) {
            Map<String, Object> config = KafkaIOUtils.updateKafkaProperties(this.getProducerConfig(), configUpdates);
            return this.toBuilder().setProducerConfig(config).build();
        }

        public @UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> withProducerFactoryFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized Producer<K, V>> producerFactoryFn) {
            return this.toBuilder().setProducerFactoryFn(producerFactoryFn).build();
        }

        public @UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> withInputTimestamp() {
            return this.withPublishTimestampFunction(KafkaPublishTimestampFunction.withElementTimestamp());
        }

        @Deprecated
        public @UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> withPublishTimestampFunction(@UnknownKeyFor @NonNull @Initialized KafkaPublishTimestampFunction<@UnknownKeyFor @NonNull @Initialized ProducerRecord<K, V>> timestampFunction) {
            return this.toBuilder().setPublishTimestampFunction(timestampFunction).build();
        }

        public @UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> withEOS(@UnknownKeyFor @NonNull @Initialized int numShards, @UnknownKeyFor @NonNull @Initialized String sinkGroupId) {
            KafkaExactlyOnceSink.ensureEOSSupport();
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((numShards >= 1 ? 1 : 0) != 0, (Object)"numShards should be >= 1");
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((sinkGroupId != null ? 1 : 0) != 0, (Object)"sinkGroupId is required for exactly-once sink");
            return this.toBuilder().setEOS(true).setNumShards(numShards).setSinkGroupId(sinkGroupId).build();
        }

        public @UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> withConsumerFactoryFn(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> consumerFactoryFn) {
            return this.toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
        }

        public @UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> withBadRecordErrorHandler(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized ErrorHandler<@UnknownKeyFor @NonNull @Initialized BadRecord, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> badRecordErrorHandler) {
            return this.toBuilder().setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER).setBadRecordErrorHandler(badRecordErrorHandler).build();
        }

        public @UnknownKeyFor @NonNull @Initialized PDone expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized ProducerRecord<K, V>> input) {
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((this.getProducerConfig().get("bootstrap.servers") != null ? 1 : 0) != 0, (Object)"withBootstrapServers() is required");
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((this.getKeySerializer() != null ? 1 : 0) != 0, (Object)"withKeySerializer() is required");
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((this.getValueSerializer() != null ? 1 : 0) != 0, (Object)"withValueSerializer() is required");
            if (this.isEOS()) {
                org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((this.getTopic() != null ? 1 : 0) != 0, (Object)"withTopic() is required when isEOS() is true");
                org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((boolean)(this.getBadRecordErrorHandler() instanceof ErrorHandler.DefaultErrorHandler), (Object)"BadRecordErrorHandling isn't supported with Kafka Exactly Once writing");
                KafkaExactlyOnceSink.ensureEOSSupport();
                input.apply(new KafkaExactlyOnceSink(this));
            } else {
                PCollectionTuple pCollectionTuple = (PCollectionTuple)input.apply((PTransform)ParDo.of(new KafkaWriter(this)).withOutputTags(new TupleTag(), TupleTagList.of((TupleTag)BadRecordRouter.BAD_RECORD_TAG)));
                this.getBadRecordErrorHandler().addErrorCollection(pCollectionTuple.get(BadRecordRouter.BAD_RECORD_TAG).setCoder(BadRecord.getCoder((Pipeline)input.getPipeline())));
            }
            return PDone.in((Pipeline)input.getPipeline());
        }

        public void validate(@Nullable @UnknownKeyFor @Initialized PipelineOptions options) {
            Preconditions.checkStateNotNull((Object)options);
            if (this.isEOS()) {
                String runner = options.getRunner().getName();
                if ("org.apache.beam.runners.direct.DirectRunner".equals(runner) || runner.startsWith("org.apache.beam.runners.dataflow.") || runner.startsWith("org.apache.beam.runners.spark.") || runner.startsWith("org.apache.beam.runners.flink.")) {
                    return;
                }
                throw new UnsupportedOperationException(runner + " is not a runner known to be compatible with Kafka exactly-once sink. This implementation of exactly-once sink relies on specific checkpoint guarantees. Only the runners with known to have compatible checkpoint semantics are allowed.");
            }
        }

        public void populateDisplayData(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item((String)"topic", (String)this.getTopic()).withLabel("Topic"));
            Set<String> ignoredProducerPropertiesKeys = IGNORED_PRODUCER_PROPERTIES.keySet();
            for (Map.Entry<String, Object> conf : this.getProducerConfig().entrySet()) {
                String key = conf.getKey();
                if (ignoredProducerPropertiesKeys.contains(key)) continue;
                Object value = DisplayData.inferType((Object)conf.getValue()) != null ? conf.getValue() : String.valueOf(conf.getValue());
                builder.add(DisplayData.item((String)key, (ValueProvider)ValueProvider.StaticValueProvider.of((Object)value)));
            }
        }

        @AutoValue.Builder
        static abstract class Builder<@UnknownKeyFor K, @UnknownKeyFor V> {
            Builder() {
            }

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setTopic(@UnknownKeyFor @NonNull @Initialized String var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setProducerConfig(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setProducerFactoryFn(@Nullable @UnknownKeyFor @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized Producer<K, V>> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setKeySerializer(@Nullable @UnknownKeyFor @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Serializer<K>> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setValueSerializer(@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Serializer<V>> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setPublishTimestampFunction(@UnknownKeyFor @NonNull @Initialized KafkaPublishTimestampFunction<@UnknownKeyFor @NonNull @Initialized ProducerRecord<K, V>> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setEOS(@UnknownKeyFor @NonNull @Initialized boolean var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setSinkGroupId(@UnknownKeyFor @NonNull @Initialized String var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setNumShards(@UnknownKeyFor @NonNull @Initialized int var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setConsumerFactoryFn(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setBadRecordRouter(@UnknownKeyFor @NonNull @Initialized BadRecordRouter var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setBadRecordErrorHandler(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized ErrorHandler<@UnknownKeyFor @NonNull @Initialized BadRecord, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> var1);

            abstract @UnknownKeyFor @NonNull @Initialized WriteRecords<K, V> build();
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    public static abstract class ReadSourceDescriptors<@UnknownKeyFor K, @UnknownKeyFor V>
    extends PTransform<PCollection<KafkaSourceDescriptor>, PCollection<KafkaRecord<K, V>>> {
        private final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> records = new TupleTag();
        private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(ReadSourceDescriptors.class);

        @Pure
        abstract @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> getConsumerConfig();

        @Pure
        abstract @Nullable @UnknownKeyFor @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> getOffsetConsumerConfig();

        @Pure
        abstract @Nullable @UnknownKeyFor @Initialized DeserializerProvider<K> getKeyDeserializerProvider();

        @Pure
        abstract @Nullable @UnknownKeyFor @Initialized DeserializerProvider<V> getValueDeserializerProvider();

        @Pure
        abstract @Nullable @UnknownKeyFor @Initialized Coder<K> getKeyCoder();

        @Pure
        abstract @Nullable @UnknownKeyFor @Initialized Coder<V> getValueCoder();

        @Pure
        abstract @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>> getConsumerFactoryFn();

        @Pure
        abstract @Nullable @UnknownKeyFor @Initialized CheckStopReadingFn getCheckStopReadingFn();

        @Pure
        abstract @Nullable @UnknownKeyFor @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> getExtractOutputTimestampFn();

        @Pure
        abstract @Nullable @UnknownKeyFor @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Instant, @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant>> getCreateWatermarkEstimatorFn();

        @Pure
        abstract @UnknownKeyFor @NonNull @Initialized boolean isCommitOffsetEnabled();

        @Pure
        abstract @UnknownKeyFor @NonNull @Initialized boolean isRedistribute();

        @Pure
        abstract @UnknownKeyFor @NonNull @Initialized boolean isAllowDuplicates();

        @Pure
        abstract @UnknownKeyFor @NonNull @Initialized int getRedistributeNumKeys();

        @Pure
        abstract @Nullable @UnknownKeyFor @Initialized TimestampPolicyFactory<K, V> getTimestampPolicyFactory();

        @Pure
        abstract @UnknownKeyFor @NonNull @Initialized BadRecordRouter getBadRecordRouter();

        @Pure
        abstract /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized ErrorHandler<@UnknownKeyFor @NonNull @Initialized BadRecord, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> getBadRecordErrorHandler();

        @Pure
        abstract @UnknownKeyFor @NonNull @Initialized long getConsumerPollingTimeout();

        abstract @UnknownKeyFor @NonNull @Initialized boolean isBounded();

        abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> toBuilder();

        public static <K, V> @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> read() {
            return new AutoValue_KafkaIO_ReadSourceDescriptors.Builder().setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN).setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES).setCommitOffsetEnabled(false).setBounded(false).setBadRecordRouter(BadRecordRouter.THROWING_ROUTER).setBadRecordErrorHandler((ErrorHandler<BadRecord, ?>)new ErrorHandler.DefaultErrorHandler()).setConsumerPollingTimeout(2L).setRedistribute(false).setAllowDuplicates(false).setRedistributeNumKeys(0).build().withProcessingTime().withMonotonicallyIncreasingWatermarkEstimator();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withBootstrapServers(@UnknownKeyFor @NonNull @Initialized String bootstrapServers) {
            return this.withConsumerConfigUpdates((Map<String, Object>)ImmutableMap.of((Object)"bootstrap.servers", (Object)bootstrapServers));
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withKeyDeserializer(@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Deserializer<K>> keyDeserializer) {
            return this.withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withKeyDeserializerAndCoder(@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Deserializer<K>> keyDeserializer, @UnknownKeyFor @NonNull @Initialized Coder<K> keyCoder) {
            return this.withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withKeyDeserializerProvider(@Nullable @UnknownKeyFor @Initialized DeserializerProvider<K> deserializerProvider) {
            return this.toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withKeyDeserializerProviderAndCoder(@Nullable @UnknownKeyFor @Initialized DeserializerProvider<K> deserializerProvider, @UnknownKeyFor @NonNull @Initialized Coder<K> keyCoder) {
            return this.toBuilder().setKeyDeserializerProvider(deserializerProvider).setKeyCoder(keyCoder).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withValueDeserializer(@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Deserializer<V>> valueDeserializer) {
            return this.withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withValueDeserializerAndCoder(@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Deserializer<V>> valueDeserializer, @UnknownKeyFor @NonNull @Initialized Coder<V> valueCoder) {
            return this.withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withValueDeserializerProvider(@Nullable @UnknownKeyFor @Initialized DeserializerProvider<V> deserializerProvider) {
            return this.toBuilder().setValueDeserializerProvider(deserializerProvider).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withValueDeserializerProviderAndCoder(@Nullable @UnknownKeyFor @Initialized DeserializerProvider<V> deserializerProvider, @UnknownKeyFor @NonNull @Initialized Coder<V> valueCoder) {
            return this.toBuilder().setValueDeserializerProvider(deserializerProvider).setValueCoder(valueCoder).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withConsumerFactoryFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>> consumerFactoryFn) {
            return this.toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withCheckStopReadingFn(@Nullable @UnknownKeyFor @Initialized CheckStopReadingFn checkStopReadingFn) {
            return this.toBuilder().setCheckStopReadingFn(checkStopReadingFn).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withCheckStopReadingFn(@Nullable @UnknownKeyFor @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized TopicPartition, @UnknownKeyFor @NonNull @Initialized Boolean> checkStopReadingFn) {
            return this.toBuilder().setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn)).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withConsumerConfigUpdates(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> configUpdates) {
            Map<String, Object> config = KafkaIOUtils.updateKafkaProperties(this.getConsumerConfig(), configUpdates);
            return this.toBuilder().setConsumerConfig(config).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withExtractOutputTimestampFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> fn) {
            return this.toBuilder().setExtractOutputTimestampFn(fn).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withCreatWatermarkEstimatorFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Instant, @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant>> fn) {
            return this.toBuilder().setCreateWatermarkEstimatorFn(fn).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withLogAppendTime() {
            return this.withExtractOutputTimestampFn(ExtractOutputTimestampFns.useLogAppendTime());
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withProcessingTime() {
            return this.withExtractOutputTimestampFn(ExtractOutputTimestampFns.useProcessingTime());
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withRedistribute() {
            return this.toBuilder().setRedistribute(true).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withAllowDuplicates() {
            return this.toBuilder().setAllowDuplicates(true).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withRedistributeNumKeys(@UnknownKeyFor @NonNull @Initialized int redistributeNumKeys) {
            return this.toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withCreateTime() {
            return this.withExtractOutputTimestampFn(ExtractOutputTimestampFns.useCreateTime());
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withWallTimeWatermarkEstimator() {
            return this.withCreatWatermarkEstimatorFn((SerializableFunction<Instant, WatermarkEstimator<Instant>>)(SerializableFunction & Serializable)state -> new WatermarkEstimators.WallTime(state));
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withMonotonicallyIncreasingWatermarkEstimator() {
            return this.withCreatWatermarkEstimatorFn((SerializableFunction<Instant, WatermarkEstimator<Instant>>)(SerializableFunction & Serializable)state -> new WatermarkEstimators.MonotonicallyIncreasing(state));
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withManualWatermarkEstimator() {
            return this.withCreatWatermarkEstimatorFn((SerializableFunction<Instant, WatermarkEstimator<Instant>>)(SerializableFunction & Serializable)state -> new WatermarkEstimators.Manual(state));
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withReadCommitted() {
            return this.withConsumerConfigUpdates((Map<String, Object>)ImmutableMap.of((Object)"isolation.level", (Object)"read_committed"));
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> commitOffsets() {
            return this.toBuilder().setCommitOffsetEnabled(true).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withOffsetConsumerConfigOverrides(@Nullable @UnknownKeyFor @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> offsetConsumerConfig) {
            return this.toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withConsumerConfigOverrides(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> consumerConfig) {
            return this.toBuilder().setConsumerConfig(consumerConfig).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withBadRecordErrorHandler(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized ErrorHandler<@UnknownKeyFor @NonNull @Initialized BadRecord, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> errorHandler) {
            return this.toBuilder().setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER).setBadRecordErrorHandler(errorHandler).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withConsumerPollingTimeout(@UnknownKeyFor @NonNull @Initialized long duration) {
            return this.toBuilder().setConsumerPollingTimeout(duration).build();
        }

        @UnknownKeyFor @NonNull @Initialized ReadAllFromRow<K, V> forExternalBuild() {
            return new ReadAllFromRow(this);
        }

        @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withTimestampPolicyFactory(@UnknownKeyFor @NonNull @Initialized TimestampPolicyFactory<K, V> timestampPolicyFactory) {
            return this.toBuilder().setTimestampPolicyFactory(timestampPolicyFactory).build().withManualWatermarkEstimator();
        }

        @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> withBounded() {
            return this.toBuilder().setBounded(true).build();
        }

        public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>> expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor> input) {
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((this.getKeyDeserializerProvider() != null ? 1 : 0) != 0, (Object)"withKeyDeserializer() is required");
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((this.getValueDeserializerProvider() != null ? 1 : 0) != 0, (Object)"withValueDeserializer() is required");
            if (!ConsumerSpEL.hasOffsetsForTimes()) {
                LOG.warn("Kafka client version {} is too old. Versions before 0.10.1.0 are deprecated and may not be supported in next release of Apache Beam. Please upgrade your Kafka client version.", (Object)AppInfoParser.getVersion());
            }
            if (this.isCommitOffsetEnabled() && this.configuredKafkaCommit()) {
                LOG.info("auto_commit is set together with commitOffsetEnabled but you only need one of them. The commitOffsetEnabled is going to be ignored");
            }
            if (this.isRedistribute()) {
                if (this.getRedistributeNumKeys() == 0) {
                    LOG.warn("This will create a key per record, which is sub-optimal for most use cases.");
                }
                if ((this.isCommitOffsetEnabled() || this.configuredKafkaCommit()) && this.isAllowDuplicates()) {
                    LOG.warn("Either auto_commit is set, or commitOffsetEnabled is enabled (or both), but since withRestribute() is enabled with allow duplicates, the runner may have additional work processed that is ahead of the current checkpoint");
                }
            }
            if (this.getConsumerConfig().get("bootstrap.servers") == null) {
                LOG.warn("The bootstrapServers is not set. It must be populated through the KafkaSourceDescriptor during runtime otherwise the pipeline will fail.");
            }
            CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
            Coder<K> keyCoder = this.getKeyCoder(coderRegistry);
            Coder<V> valueCoder = this.getValueCoder(coderRegistry);
            KafkaRecordCoder<K, V> recordCoder = KafkaRecordCoder.of(keyCoder, valueCoder);
            try {
                boolean applyCommitOffsets;
                PCollectionTuple pCollectionTuple = (PCollectionTuple)input.apply((PTransform)ParDo.of(ReadFromKafkaDoFn.create(this, this.records)).withOutputTags(this.records, TupleTagList.of((TupleTag)BadRecordRouter.BAD_RECORD_TAG)));
                this.getBadRecordErrorHandler().addErrorCollection(pCollectionTuple.get(BadRecordRouter.BAD_RECORD_TAG).setCoder(BadRecord.getCoder((Pipeline)input.getPipeline())));
                PCollection outputWithDescriptor = pCollectionTuple.get(this.records).setCoder((Coder)KvCoder.of((Coder)input.getPipeline().getSchemaRegistry().getSchemaCoder(KafkaSourceDescriptor.class), recordCoder));
                boolean bl = applyCommitOffsets = this.isCommitOffsetEnabled() && !this.configuredKafkaCommit();
                if (!applyCommitOffsets) {
                    return ((PCollection)outputWithDescriptor.apply((PTransform)MapElements.into((TypeDescriptor)new TypeDescriptor<KafkaRecord<K, V>>(){}).via(KV::getValue))).setCoder(recordCoder);
                }
                String requestedVersionString = ((StreamingOptions)input.getPipeline().getOptions().as(StreamingOptions.class)).getUpdateCompatibilityVersion();
                if (requestedVersionString != null && TransformUpgrader.compareVersions((String)requestedVersionString, (String)"2.60.0") < 0) {
                    org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((!this.isRedistribute() ? 1 : 0) != 0, (Object)"Can not enable isRedistribute() while committing offsets prior to 2.60.0");
                    return this.expand259Commits((PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>>)outputWithDescriptor, (Coder<KafkaRecord<K, V>>)recordCoder, input.getPipeline().getSchemaRegistry());
                }
                ((PCollection)outputWithDescriptor.apply(new KafkaCommitOffset(this, false))).setCoder((Coder)VoidCoder.of());
                return ((PCollection)outputWithDescriptor.apply((PTransform)MapElements.into((TypeDescriptor)new TypeDescriptor<KafkaRecord<K, V>>(){}).via(KV::getValue))).setCoder(recordCoder);
            }
            catch (NoSuchSchemaException e) {
                throw new RuntimeException(e.getMessage());
            }
        }

        private @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>> expand259Commits(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor, @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> outputWithDescriptor, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>> recordCoder, @UnknownKeyFor @NonNull @Initialized SchemaRegistry schemaRegistry) throws @UnknownKeyFor @NonNull @Initialized NoSuchSchemaException {
            outputWithDescriptor = ((PCollection)outputWithDescriptor.apply((PTransform)Reshuffle.viaRandomKey())).setCoder((Coder)KvCoder.of((Coder)schemaRegistry.getSchemaCoder(KafkaSourceDescriptor.class), recordCoder));
            ((PCollection)outputWithDescriptor.apply(new KafkaCommitOffset(this, true))).setCoder((Coder)VoidCoder.of());
            return ((PCollection)outputWithDescriptor.apply((PTransform)MapElements.into((TypeDescriptor)new TypeDescriptor<KafkaRecord<K, V>>(){}).via(KV::getValue))).setCoder(recordCoder);
        }

        private @UnknownKeyFor @NonNull @Initialized Coder<K> getKeyCoder(@UnknownKeyFor @NonNull @Initialized CoderRegistry coderRegistry) {
            return this.getKeyCoder() != null ? this.getKeyCoder() : ((DeserializerProvider)Preconditions.checkStateNotNull(this.getKeyDeserializerProvider())).getCoder(coderRegistry);
        }

        private @UnknownKeyFor @NonNull @Initialized Coder<V> getValueCoder(@UnknownKeyFor @NonNull @Initialized CoderRegistry coderRegistry) {
            return this.getValueCoder() != null ? this.getValueCoder() : ((DeserializerProvider)Preconditions.checkStateNotNull(this.getValueDeserializerProvider())).getCoder(coderRegistry);
        }

        private @UnknownKeyFor @NonNull @Initialized boolean configuredKafkaCommit() {
            return Boolean.TRUE.equals(this.getConsumerConfig().get("enable.auto.commit"));
        }

        static class ExtractOutputTimestampFns<@UnknownKeyFor K, @UnknownKeyFor V> {
            ExtractOutputTimestampFns() {
            }

            public static <K, V> @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> useProcessingTime() {
                return (SerializableFunction & Serializable)record -> Instant.now();
            }

            public static <K, V> @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> useCreateTime() {
                return (SerializableFunction & Serializable)record -> {
                    org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((record.getTimestampType() == KafkaTimestampType.CREATE_TIME ? 1 : 0) != 0, (String)"Kafka record's timestamp is not 'CREATE_TIME' (topic: %s, partition %s, offset %s, timestamp type '%s')", (Object)record.getTopic(), (Object)record.getPartition(), (Object)record.getOffset(), (Object)((Object)record.getTimestampType()));
                    return new Instant(record.getTimestamp());
                };
            }

            public static <K, V> @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> useLogAppendTime() {
                return (SerializableFunction & Serializable)record -> {
                    org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((record.getTimestampType() == KafkaTimestampType.LOG_APPEND_TIME ? 1 : 0) != 0, (String)"Kafka record's timestamp is not 'LOG_APPEND_TIME' (topic: %s, partition %s, offset %s, timestamp type '%s')", (Object)record.getTopic(), (Object)record.getPartition(), (Object)record.getOffset(), (Object)((Object)record.getTimestampType()));
                    return new Instant(record.getTimestamp());
                };
            }
        }

        private static class ReadAllFromRow<@UnknownKeyFor K, @UnknownKeyFor V>
        extends PTransform<PCollection<Row>, PCollection<KV<K, V>>> {
            private final @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> readViaSDF;

            ReadAllFromRow(@UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> read) {
                this.readViaSDF = read;
            }

            public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, V>> expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row> input) {
                return ((PCollection)((PCollection)((PCollection)input.apply(Convert.fromRows(KafkaSourceDescriptor.class))).apply(this.readViaSDF)).apply((PTransform)ParDo.of((DoFn)new DoFn<KafkaRecord<K, V>, KV<K, V>>(){

                    @DoFn.ProcessElement
                    public void processElement(@DoFn.Element @UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V> element, // Could not load outer class - annotation placement on inner may be incorrect
                    @UnknownKeyFor @NonNull @Initialized DoFn.OutputReceiver<@UnknownKeyFor @NonNull @Initialized KV<K, V>> outputReceiver) {
                        outputReceiver.output(element.getKV());
                    }
                }))).setCoder((Coder)KvCoder.of((Coder)((Coder)Preconditions.checkStateNotNull(this.readViaSDF.getKeyCoder())), (Coder)((Coder)Preconditions.checkStateNotNull(this.readViaSDF.getValueCoder()))));
            }
        }

        @AutoValue.Builder
        static abstract class Builder<@UnknownKeyFor K, @UnknownKeyFor V> {
            Builder() {
            }

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setConsumerConfig(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setOffsetConsumerConfig(@Nullable @UnknownKeyFor @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setConsumerFactoryFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setCheckStopReadingFn(@Nullable @UnknownKeyFor @Initialized CheckStopReadingFn var1);

            @UnknownKeyFor @NonNull @Initialized Builder<K, V> setCheckStopReadingFn(@Nullable @UnknownKeyFor @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized TopicPartition, @UnknownKeyFor @NonNull @Initialized Boolean> checkStopReadingFn) {
                return this.setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn));
            }

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setKeyDeserializerProvider(@Nullable @UnknownKeyFor @Initialized DeserializerProvider<K> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setValueDeserializerProvider(@Nullable @UnknownKeyFor @Initialized DeserializerProvider<V> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setKeyCoder(@UnknownKeyFor @NonNull @Initialized Coder<K> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setValueCoder(@UnknownKeyFor @NonNull @Initialized Coder<V> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setExtractOutputTimestampFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setCreateWatermarkEstimatorFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Instant, @UnknownKeyFor @NonNull @Initialized WatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant>> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setCommitOffsetEnabled(@UnknownKeyFor @NonNull @Initialized boolean var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setTimestampPolicyFactory(@UnknownKeyFor @NonNull @Initialized TimestampPolicyFactory<K, V> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setBadRecordRouter(@UnknownKeyFor @NonNull @Initialized BadRecordRouter var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setBadRecordErrorHandler(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized ErrorHandler<@UnknownKeyFor @NonNull @Initialized BadRecord, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setConsumerPollingTimeout(@UnknownKeyFor @NonNull @Initialized long var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setBounded(@UnknownKeyFor @NonNull @Initialized boolean var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setRedistribute(@UnknownKeyFor @NonNull @Initialized boolean var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setAllowDuplicates(@UnknownKeyFor @NonNull @Initialized boolean var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setRedistributeNumKeys(@UnknownKeyFor @NonNull @Initialized int var1);

            abstract @UnknownKeyFor @NonNull @Initialized ReadSourceDescriptors<K, V> build();
        }
    }

    static class RowsWithMetadata<@UnknownKeyFor K, @UnknownKeyFor V>
    extends PTransform<PBegin, PCollection<Row>> {
        private final @UnknownKeyFor @NonNull @Initialized Read<K, V> read;

        RowsWithMetadata(@UnknownKeyFor @NonNull @Initialized Read<K, V> read) {
            super("KafkaIO.RowsWithMetadata");
            this.read = read;
        }

        public static <K, V> @UnknownKeyFor @NonNull @Initialized ByteArrayKafkaRecord toExternalKafkaRecord(@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V> kafkaRecord) {
            List<KafkaHeader> headers = kafkaRecord.getHeaders() == null ? null : Arrays.stream(kafkaRecord.getHeaders().toArray()).map(h -> new KafkaHeader(h.key(), h.value())).collect(Collectors.toList());
            ByteArrayKafkaRecord byteArrayKafkaRecord = new ByteArrayKafkaRecord(kafkaRecord.getTopic(), kafkaRecord.getPartition(), kafkaRecord.getOffset(), kafkaRecord.getTimestamp(), (byte[])kafkaRecord.getKV().getKey(), (byte[])kafkaRecord.getKV().getValue(), headers, kafkaRecord.getTimestampType().id, kafkaRecord.getTimestampType().name);
            return byteArrayKafkaRecord;
        }

        public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row> expand(@UnknownKeyFor @NonNull @Initialized PBegin begin) {
            return (PCollection)((PCollection)((PCollection)begin.apply(this.read)).apply("Convert to ExternalKafkaRecord", (PTransform)ParDo.of((DoFn)new DoFn<KafkaRecord<K, V>, ByteArrayKafkaRecord>(){

                @DoFn.ProcessElement
                public void processElement(/*
                 * Issues handling annotations - annotations may be inaccurate
                 */
                // Could not load outer class - annotation placement on inner may be incorrect
                @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext ctx) {
                    KafkaRecord kafkRecord = (KafkaRecord)ctx.element();
                    ctx.output((Object)RowsWithMetadata.toExternalKafkaRecord(kafkRecord));
                }
            }))).apply(Convert.toRows());
        }

        public void populateDisplayData(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.read.populateDisplayData(builder);
        }

        static class Builder<@UnknownKeyFor K, @UnknownKeyFor V>
        implements ExternalTransformBuilder<Read.External.Configuration, PBegin, PCollection<Row>> {
            Builder() {
            }

            public @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PBegin, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row>> buildExternal(@UnknownKeyFor @NonNull @Initialized Read.External.Configuration config) {
                AutoValue_KafkaIO_Read.Builder readBuilder = new AutoValue_KafkaIO_Read.Builder();
                Read.Builder.setupExternalBuilder(readBuilder, config);
                Class keyDeserializer = KafkaIO.resolveClass(config.keyDeserializer);
                Coder keyCoder = Read.Builder.resolveCoder(keyDeserializer);
                if (!(keyCoder instanceof NullableCoder) || !(keyCoder.getCoderArguments().get(0) instanceof ByteArrayCoder)) {
                    throw new RuntimeException("ExternalWithMetadata transform only supports keys of type nullable(byte[])");
                }
                Class valueDeserializer = KafkaIO.resolveClass(config.valueDeserializer);
                Coder valueCoder = Read.Builder.resolveCoder(valueDeserializer);
                if (!(valueCoder instanceof NullableCoder) || !(valueCoder.getCoderArguments().get(0) instanceof ByteArrayCoder)) {
                    throw new RuntimeException("ExternalWithMetadata transform only supports values of type nullable(byte[])");
                }
                return ((Read.Builder)readBuilder).build().externalWithMetadata();
            }
        }
    }

    @DefaultSchema(value=JavaFieldSchema.class)
    @SuppressFBWarnings(value={"URF_UNREAD_FIELD"})
    static class ByteArrayKafkaRecord {
        @UnknownKeyFor @NonNull @Initialized String topic;
        @UnknownKeyFor @NonNull @Initialized int partition;
        @UnknownKeyFor @NonNull @Initialized long offset;
        @UnknownKeyFor @NonNull @Initialized long timestamp;
        @UnknownKeyFor @NonNull @Initialized byte @Nullable @UnknownKeyFor @Initialized [] key;
        @UnknownKeyFor @NonNull @Initialized byte @Nullable @UnknownKeyFor @Initialized [] value;
        @Nullable @UnknownKeyFor @Initialized List<@UnknownKeyFor @NonNull @Initialized KafkaHeader> headers;
        @UnknownKeyFor @NonNull @Initialized int timestampTypeId;
        @UnknownKeyFor @NonNull @Initialized String timestampTypeName;

        @SchemaCreate
        public ByteArrayKafkaRecord(@UnknownKeyFor @NonNull @Initialized String topic, @UnknownKeyFor @NonNull @Initialized int partition, @UnknownKeyFor @NonNull @Initialized long offset, @UnknownKeyFor @NonNull @Initialized long timestamp, @UnknownKeyFor @NonNull @Initialized byte @Nullable @UnknownKeyFor @Initialized [] key, @UnknownKeyFor @NonNull @Initialized byte @Nullable @UnknownKeyFor @Initialized [] value, @Nullable @UnknownKeyFor @Initialized List<@UnknownKeyFor @NonNull @Initialized KafkaHeader> headers, @UnknownKeyFor @NonNull @Initialized int timestampTypeId, @UnknownKeyFor @NonNull @Initialized String timestampTypeName) {
            this.topic = topic;
            this.partition = partition;
            this.offset = offset;
            this.timestamp = timestamp;
            this.key = key;
            this.value = value;
            this.headers = headers;
            this.timestampTypeId = timestampTypeId;
            this.timestampTypeName = timestampTypeName;
        }
    }

    @DefaultSchema(value=JavaFieldSchema.class)
    @SuppressFBWarnings(value={"URF_UNREAD_FIELD"})
    static class KafkaHeader {
        @UnknownKeyFor @NonNull @Initialized String key;
        @UnknownKeyFor @NonNull @Initialized byte @Nullable @UnknownKeyFor @Initialized [] value;

        @SchemaCreate
        public KafkaHeader(@UnknownKeyFor @NonNull @Initialized String key, @UnknownKeyFor @NonNull @Initialized byte @Nullable @UnknownKeyFor @Initialized [] value) {
            this.key = key;
            this.value = value;
        }
    }

    public static class TypedWithoutMetadata<@UnknownKeyFor K, @UnknownKeyFor V>
    extends PTransform<PBegin, PCollection<KV<K, V>>> {
        private final @UnknownKeyFor @NonNull @Initialized Read<K, V> read;

        TypedWithoutMetadata(@UnknownKeyFor @NonNull @Initialized Read<K, V> read) {
            super("KafkaIO.Read");
            this.read = read;
        }

        public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, V>> expand(@UnknownKeyFor @NonNull @Initialized PBegin begin) {
            return (PCollection)((PCollection)begin.apply(this.read)).apply("Remove Kafka Metadata", (PTransform)ParDo.of((DoFn)new DoFn<KafkaRecord<K, V>, KV<K, V>>(){

                @DoFn.ProcessElement
                public void processElement(/*
                 * Issues handling annotations - annotations may be inaccurate
                 */
                // Could not load outer class - annotation placement on inner may be incorrect
                @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext ctx) {
                    ctx.output(((KafkaRecord)ctx.element()).getKV());
                }
            }));
        }

        public void populateDisplayData(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.read.populateDisplayData(builder);
        }

        static class Builder<@UnknownKeyFor K, @UnknownKeyFor V>
        implements ExternalTransformBuilder<Read.External.Configuration, PBegin, PCollection<KV<K, V>>> {
            Builder() {
            }

            public @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PBegin, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, V>>> buildExternal(@UnknownKeyFor @NonNull @Initialized Read.External.Configuration config) {
                AutoValue_KafkaIO_Read.Builder readBuilder = new AutoValue_KafkaIO_Read.Builder();
                Read.Builder.setupExternalBuilder(readBuilder, config);
                return ((Read.Builder)readBuilder).build().withoutMetadata();
            }
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    public static abstract class Read<@UnknownKeyFor K, @UnknownKeyFor V>
    extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
        public static final @UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized AutoValue_KafkaIO_Read> AUTOVALUE_CLASS = AutoValue_KafkaIO_Read.class;
        @Internal
        public static final @UnknownKeyFor @NonNull @Initialized PTransformOverride KAFKA_READ_OVERRIDE = PTransformOverride.of((PTransformMatcher)PTransformMatchers.classEqualTo(ReadFromKafkaViaSDF.class), new KafkaReadOverrideFactory());

        @Pure
        public abstract @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> getConsumerConfig();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized List<@UnknownKeyFor @NonNull @Initialized String> getTopics();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized List<@UnknownKeyFor @NonNull @Initialized TopicPartition> getTopicPartitions();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized Pattern getTopicPattern();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized Coder<K> getKeyCoder();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized Coder<V> getValueCoder();

        @Pure
        public abstract @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>> getConsumerFactoryFn();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> getWatermarkFn();

        @Pure
        public abstract @UnknownKeyFor @NonNull @Initialized long getMaxNumRecords();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized Duration getMaxReadTime();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized Instant getStartReadTime();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized Instant getStopReadTime();

        @Pure
        public abstract @UnknownKeyFor @NonNull @Initialized boolean isCommitOffsetsInFinalizeEnabled();

        @Pure
        public abstract @UnknownKeyFor @NonNull @Initialized boolean isDynamicRead();

        @Pure
        public abstract @UnknownKeyFor @NonNull @Initialized boolean isRedistributed();

        @Pure
        public abstract @UnknownKeyFor @NonNull @Initialized boolean isAllowDuplicates();

        @Pure
        public abstract @UnknownKeyFor @NonNull @Initialized int getRedistributeNumKeys();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized Boolean getOffsetDeduplication();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized Duration getWatchTopicPartitionDuration();

        @Pure
        public abstract @UnknownKeyFor @NonNull @Initialized TimestampPolicyFactory<K, V> getTimestampPolicyFactory();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> getOffsetConsumerConfig();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized DeserializerProvider<K> getKeyDeserializerProvider();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized DeserializerProvider<V> getValueDeserializerProvider();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized CheckStopReadingFn getCheckStopReadingFn();

        @Pure
        public abstract /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @Nullable @UnknownKeyFor @Initialized ErrorHandler<@UnknownKeyFor @NonNull @Initialized BadRecord, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> getBadRecordErrorHandler();

        @Pure
        public abstract @UnknownKeyFor @NonNull @Initialized long getConsumerPollingTimeout();

        @Pure
        public abstract @Nullable @UnknownKeyFor @Initialized Boolean getLogTopicVerification();

        abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> toBuilder();

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withBootstrapServers(@UnknownKeyFor @NonNull @Initialized String bootstrapServers) {
            return this.withConsumerConfigUpdates((Map<String, Object>)ImmutableMap.of((Object)"bootstrap.servers", (Object)bootstrapServers));
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withTopic(@UnknownKeyFor @NonNull @Initialized String topic) {
            return this.withTopics((List<String>)ImmutableList.of((Object)topic));
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withTopics(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> topics) {
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(((this.getTopicPartitions() == null || this.getTopicPartitions().isEmpty()) && this.getTopicPattern() == null ? 1 : 0) != 0, (Object)"Only one of topics, topicPartitions or topicPattern can be set");
            return this.toBuilder().setTopics((List<String>)ImmutableList.copyOf(topics)).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withTopicPartitions(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized TopicPartition> topicPartitions) {
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(((this.getTopics() == null || this.getTopics().isEmpty()) && this.getTopicPattern() == null ? 1 : 0) != 0, (Object)"Only one of topics, topicPartitions or topicPattern can be set");
            return this.toBuilder().setTopicPartitions((List<TopicPartition>)ImmutableList.copyOf(topicPartitions)).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withRedistribute() {
            return this.toBuilder().setRedistributed(true).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withAllowDuplicates(@UnknownKeyFor @NonNull @Initialized Boolean allowDuplicates) {
            return this.toBuilder().setAllowDuplicates(allowDuplicates).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withRedistributeNumKeys(@UnknownKeyFor @NonNull @Initialized int redistributeNumKeys) {
            return this.toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withOffsetDeduplication(@UnknownKeyFor @NonNull @Initialized Boolean offsetDeduplication) {
            return this.toBuilder().setOffsetDeduplication(offsetDeduplication).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withTopicPattern(@UnknownKeyFor @NonNull @Initialized String topicPattern) {
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState((!(this.getTopics() != null && !this.getTopics().isEmpty() || this.getTopicPartitions() != null && !this.getTopicPartitions().isEmpty()) ? 1 : 0) != 0, (Object)"Only one of topics, topicPartitions or topicPattern can be set");
            return this.toBuilder().setTopicPattern(Pattern.compile(topicPattern)).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withKeyDeserializer(@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Deserializer<K>> keyDeserializer) {
            return this.withKeyDeserializer(LocalDeserializerProvider.of(keyDeserializer));
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withKeyDeserializerAndCoder(@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Deserializer<K>> keyDeserializer, @UnknownKeyFor @NonNull @Initialized Coder<K> keyCoder) {
            return this.withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withKeyDeserializer(@UnknownKeyFor @NonNull @Initialized DeserializerProvider<K> deserializerProvider) {
            return this.toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withKeyDeserializerProviderAndCoder(@UnknownKeyFor @NonNull @Initialized DeserializerProvider<K> deserializerProvider, @UnknownKeyFor @NonNull @Initialized Coder<K> keyCoder) {
            return this.toBuilder().setKeyDeserializerProvider(deserializerProvider).setKeyCoder(keyCoder).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withValueDeserializer(@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Deserializer<V>> valueDeserializer) {
            return this.withValueDeserializer(LocalDeserializerProvider.of(valueDeserializer));
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withValueDeserializerAndCoder(@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Deserializer<V>> valueDeserializer, @UnknownKeyFor @NonNull @Initialized Coder<V> valueCoder) {
            return this.withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withValueDeserializer(@UnknownKeyFor @NonNull @Initialized DeserializerProvider<V> deserializerProvider) {
            return this.toBuilder().setValueDeserializerProvider(deserializerProvider).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withValueDeserializerProviderAndCoder(@UnknownKeyFor @NonNull @Initialized DeserializerProvider<V> deserializerProvider, @UnknownKeyFor @NonNull @Initialized Coder<V> valueCoder) {
            return this.toBuilder().setValueDeserializerProvider(deserializerProvider).setValueCoder(valueCoder).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withConsumerFactoryFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>> consumerFactoryFn) {
            return this.toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
        }

        @Deprecated
        public @UnknownKeyFor @NonNull @Initialized Read<K, V> updateConsumerProperties(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> configUpdates) {
            Map<String, Object> config = KafkaIOUtils.updateKafkaProperties(this.getConsumerConfig(), configUpdates);
            return this.toBuilder().setConsumerConfig(config).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withMaxNumRecords(@UnknownKeyFor @NonNull @Initialized long maxNumRecords) {
            return this.toBuilder().setMaxNumRecords(maxNumRecords).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withStartReadTime(@UnknownKeyFor @NonNull @Initialized Instant startReadTime) {
            return this.toBuilder().setStartReadTime(startReadTime).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withStopReadTime(@UnknownKeyFor @NonNull @Initialized Instant stopReadTime) {
            return this.toBuilder().setStopReadTime(stopReadTime).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withMaxReadTime(@UnknownKeyFor @NonNull @Initialized Duration maxReadTime) {
            return this.toBuilder().setMaxReadTime(maxReadTime).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withLogAppendTime() {
            return this.withTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withProcessingTime() {
            return this.withTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withCreateTime(@UnknownKeyFor @NonNull @Initialized Duration maxDelay) {
            return this.withTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(maxDelay));
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withTimestampPolicyFactory(@UnknownKeyFor @NonNull @Initialized TimestampPolicyFactory<K, V> timestampPolicyFactory) {
            return this.toBuilder().setTimestampPolicyFactory(timestampPolicyFactory).build();
        }

        @Deprecated
        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withTimestampFn2(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> timestampFn) {
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((timestampFn != null ? 1 : 0) != 0, (Object)"timestampFn can not be null");
            return this.toBuilder().setTimestampPolicyFactory(TimestampPolicyFactory.withTimestampFn(timestampFn)).build();
        }

        @Deprecated
        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withWatermarkFn2(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> watermarkFn) {
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((watermarkFn != null ? 1 : 0) != 0, (Object)"watermarkFn can not be null");
            return this.toBuilder().setWatermarkFn(watermarkFn).build();
        }

        @Deprecated
        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withTimestampFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KV<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> timestampFn) {
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((timestampFn != null ? 1 : 0) != 0, (Object)"timestampFn can not be null");
            return this.withTimestampFn2(Read.unwrapKafkaAndThen(timestampFn));
        }

        @Deprecated
        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withWatermarkFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KV<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> watermarkFn) {
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((watermarkFn != null ? 1 : 0) != 0, (Object)"watermarkFn can not be null");
            return this.withWatermarkFn2(Read.unwrapKafkaAndThen(watermarkFn));
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withReadCommitted() {
            return this.withConsumerConfigUpdates((Map<String, Object>)ImmutableMap.of((Object)"isolation.level", (Object)"read_committed"));
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> commitOffsetsInFinalize() {
            return this.toBuilder().setCommitOffsetsInFinalizeEnabled(true).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withDynamicRead(@UnknownKeyFor @NonNull @Initialized Duration duration) {
            return this.toBuilder().setDynamicRead(true).setWatchTopicPartitionDuration(duration).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withOffsetConsumerConfigOverrides(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> offsetConsumerConfig) {
            return this.toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withConsumerConfigUpdates(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> configUpdates) {
            Map<String, Object> config = KafkaIOUtils.updateKafkaProperties(this.getConsumerConfig(), configUpdates);
            return this.toBuilder().setConsumerConfig(config).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withCheckStopReadingFn(@UnknownKeyFor @NonNull @Initialized CheckStopReadingFn checkStopReadingFn) {
            return this.toBuilder().setCheckStopReadingFn(checkStopReadingFn).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withCheckStopReadingFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized TopicPartition, @UnknownKeyFor @NonNull @Initialized Boolean> checkStopReadingFn) {
            return this.toBuilder().setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn)).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withBadRecordErrorHandler(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized ErrorHandler<@UnknownKeyFor @NonNull @Initialized BadRecord, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> badRecordErrorHandler) {
            return this.toBuilder().setBadRecordErrorHandler(badRecordErrorHandler).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withConsumerPollingTimeout(@UnknownKeyFor @NonNull @Initialized long duration) {
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState((duration > 0L ? 1 : 0) != 0, (Object)"Consumer polling timeout must be greater than 0.");
            return this.toBuilder().setConsumerPollingTimeout(duration).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withGCPApplicationDefaultCredentials() {
            return this.withConsumerConfigUpdates((Map<String, Object>)ImmutableMap.of((Object)"security.protocol", (Object)"SASL_SSL", (Object)"sasl.mechanism", (Object)"OAUTHBEARER", (Object)"sasl.login.callback.handler.class", (Object)"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler", (Object)"sasl.jaas.config", (Object)"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"));
        }

        public @UnknownKeyFor @NonNull @Initialized Read<K, V> withTopicVerificationLogging(@UnknownKeyFor @NonNull @Initialized boolean logTopicVerification) {
            return this.toBuilder().setLogTopicVerification(logTopicVerification).build();
        }

        public @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PBegin, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, V>>> withoutMetadata() {
            return new TypedWithoutMetadata(this);
        }

        public @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PBegin, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row>> externalWithMetadata() {
            return new RowsWithMetadata(this);
        }

        public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>> expand(@UnknownKeyFor @NonNull @Initialized PBegin input) {
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((this.getConsumerConfig().get("bootstrap.servers") != null ? 1 : 0) != 0, (Object)"withBootstrapServers() is required");
            if (!this.isDynamicRead()) {
                org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((this.getTopics() != null && this.getTopics().size() > 0 || this.getTopicPartitions() != null && this.getTopicPartitions().size() > 0 || this.getTopicPattern() != null ? 1 : 0) != 0, (Object)"Either withTopic(), withTopics(), withTopicPartitions() or withTopicPattern() is required");
            } else {
                org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((boolean)ExperimentalOptions.hasExperiment((PipelineOptions)input.getPipeline().getOptions(), (String)"beam_fn_api"), (Object)"Kafka Dynamic Read requires enabling experiment beam_fn_api.");
            }
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((this.getKeyDeserializerProvider() != null ? 1 : 0) != 0, (Object)"withKeyDeserializer() is required");
            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((this.getValueDeserializerProvider() != null ? 1 : 0) != 0, (Object)"withValueDeserializer() is required");
            if (!ConsumerSpEL.hasOffsetsForTimes()) {
                LOG.warn("Kafka client version {} is too old. Versions before 0.10.1.0 are deprecated and may not be supported in next release of Apache Beam. Please upgrade your Kafka client version.", (Object)AppInfoParser.getVersion());
            }
            if (this.getStartReadTime() != null) {
                org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((boolean)ConsumerSpEL.hasOffsetsForTimes(), (Object)("Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, current version of Kafka Client is " + AppInfoParser.getVersion() + ". If you are building with maven, set \"kafka.clients.version\" maven property to 0.10.1.0 or newer."));
            }
            if (this.getStopReadTime() != null) {
                org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((boolean)ConsumerSpEL.hasOffsetsForTimes(), (Object)("Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, current version of Kafka Client is " + AppInfoParser.getVersion() + ". If you are building with maven, set \"kafka.clients.version\" maven property to 0.10.1.0 or newer."));
            }
            if (this.isCommitOffsetsInFinalizeEnabled()) {
                org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument((this.getConsumerConfig().get("group.id") != null ? 1 : 0) != 0, (Object)"commitOffsetsInFinalize() is enabled, but group.id in Kafka consumer config is not set. Offset management requires group.id.");
                if (Boolean.TRUE.equals(this.getConsumerConfig().get("enable.auto.commit"))) {
                    LOG.warn("'{}' in consumer config is enabled even though commitOffsetsInFinalize() is set. You need only one of them.", (Object)"enable.auto.commit");
                }
            }
            this.checkRedistributeConfiguration();
            this.warnAboutUnsafeConfigurations(input);
            CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
            Coder<K> keyCoder = this.getKeyCoder(coderRegistry);
            Coder<V> valueCoder = this.getValueCoder(coderRegistry);
            KafkaIOReadImplementationCompatibility.KafkaIOReadImplementationCompatibilityResult compatibility = KafkaIOReadImplementationCompatibility.getCompatibility(this);
            if (ExperimentalOptions.hasExperiment((PipelineOptions)input.getPipeline().getOptions(), (String)"beam_fn_api_use_deprecated_read") || ExperimentalOptions.hasExperiment((PipelineOptions)input.getPipeline().getOptions(), (String)"use_deprecated_read") || ExperimentalOptions.hasExperiment((PipelineOptions)input.getPipeline().getOptions(), (String)"use_unbounded_sdf_wrapper") || compatibility.supportsOnly(KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation.LEGACY) || compatibility.supports(KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation.LEGACY) && this.runnerPrefersLegacyRead(input.getPipeline().getOptions())) {
                return (PCollection)input.apply(new ReadFromKafkaViaUnbounded<K, V>(this, keyCoder, valueCoder));
            }
            return (PCollection)input.apply(new ReadFromKafkaViaSDF<K, V>(this, keyCoder, valueCoder));
        }

        private void checkRedistributeConfiguration() {
            if (this.getRedistributeNumKeys() == 0 && this.isRedistributed()) {
                LOG.warn("withRedistribute without withRedistributeNumKeys will create a key per record, which is sub-optimal for most use cases.");
            }
            if (this.isAllowDuplicates()) {
                LOG.warn("Setting this value without setting withRedistribute() will have no effect.");
            }
            if (this.getRedistributeNumKeys() > 0) {
                org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState((boolean)this.isRedistributed(), (Object)"withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform.");
            }
            if (this.getOffsetDeduplication() != null && this.getOffsetDeduplication().booleanValue()) {
                org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState((this.isRedistributed() && !this.isAllowDuplicates() ? 1 : 0) != 0, (Object)"withOffsetDeduplication should only be used with withRedistribute and withAllowDuplicates(false).");
            }
        }

        private void warnAboutUnsafeConfigurations(@UnknownKeyFor @NonNull @Initialized PBegin input) {
            Long checkpointingInterval = ((FakeFlinkPipelineOptions)input.getPipeline().getOptions().as(FakeFlinkPipelineOptions.class)).getCheckpointingInterval();
            String autoOffsetReset = (String)this.getConsumerConfig().get("auto.offset.reset");
            if (checkpointingInterval != null && checkpointingInterval != -1L && Boolean.TRUE.equals(this.getConsumerConfig().get("enable.auto.commit")) && !this.isCommitOffsetsInFinalizeEnabled() && (autoOffsetReset == null || "latest".equals(autoOffsetReset))) {
                LOG.warn("When using the Flink runner with checkpointingInterval enabled, Kafka enable.auto.commit enabled, and Kafka auto.offset.reset set to latest or unset, there is a chance for every checkpoint to time out, which will cause data loss. We recommend setting commitOffsetInFinalize to true in ReadFromKafka, enable.auto.commit to false, and auto.offset.reset to none");
            }
        }

        private @UnknownKeyFor @NonNull @Initialized boolean runnerPrefersLegacyRead(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
            if (ExperimentalOptions.hasExperiment((PipelineOptions)options, (String)"use_sdf_read")) {
                return false;
            }
            if (options.getRunner().getName().startsWith("org.apache.beam.runners.dataflow.")) {
                return false;
            }
            return !ExperimentalOptions.hasExperiment((PipelineOptions)options, (String)"beam_fn_api");
        }

        private @UnknownKeyFor @NonNull @Initialized Coder<K> getKeyCoder(@UnknownKeyFor @NonNull @Initialized CoderRegistry coderRegistry) {
            return this.getKeyCoder() != null ? this.getKeyCoder() : ((DeserializerProvider)Preconditions.checkStateNotNull(this.getKeyDeserializerProvider())).getCoder(coderRegistry);
        }

        private @UnknownKeyFor @NonNull @Initialized Coder<V> getValueCoder(@UnknownKeyFor @NonNull @Initialized CoderRegistry coderRegistry) {
            return this.getValueCoder() != null ? this.getValueCoder() : ((DeserializerProvider)Preconditions.checkStateNotNull(this.getValueDeserializerProvider())).getCoder(coderRegistry);
        }

        @VisibleForTesting
        @UnknownKeyFor @NonNull @Initialized UnboundedSource<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized KafkaCheckpointMark> makeSource() {
            return new KafkaUnboundedSource(this, -1);
        }

        private static <KeyT, ValueT, OutT> @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<KeyT, ValueT>, OutT> unwrapKafkaAndThen(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KV<KeyT, ValueT>, OutT> fn) {
            return (SerializableFunction & Serializable)record -> fn.apply(record.getKV());
        }

        public void populateDisplayData(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            List topics = (List)Preconditions.checkStateNotNull(this.getTopics());
            List topicPartitions = (List)Preconditions.checkStateNotNull(this.getTopicPartitions());
            Pattern topicPattern = this.getTopicPattern();
            if (topics.size() > 0) {
                builder.add(DisplayData.item((String)"topics", (String)Joiner.on((String)",").join((Iterable)topics)).withLabel("Topic/s"));
            } else if (topicPartitions.size() > 0) {
                builder.add(DisplayData.item((String)"topicPartitions", (String)Joiner.on((String)",").join((Iterable)topicPartitions)).withLabel("Topic Partition/s"));
            } else if (topicPattern != null) {
                builder.add(DisplayData.item((String)"topicPattern", (String)topicPattern.pattern()).withLabel("Topic Pattern"));
            }
            Set<String> disallowedConsumerPropertiesKeys = KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.keySet();
            for (Map.Entry<String, Object> conf : this.getConsumerConfig().entrySet()) {
                String key = conf.getKey();
                if (disallowedConsumerPropertiesKeys.contains(key)) continue;
                Object value = DisplayData.inferType((Object)conf.getValue()) != null ? conf.getValue() : String.valueOf(conf.getValue());
                builder.add(DisplayData.item((String)key, (ValueProvider)ValueProvider.StaticValueProvider.of((Object)value)));
            }
        }

        @VisibleForTesting
        static class GenerateKafkaSourceDescriptor
        extends DoFn<byte[], KafkaSourceDescriptor> {
            private final @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>> consumerFactoryFn;
            private final @Nullable @UnknownKeyFor @Initialized List<@UnknownKeyFor @NonNull @Initialized TopicPartition> topicPartitions;
            private final @Nullable @UnknownKeyFor @Initialized Instant startReadTime;
            private final @Nullable @UnknownKeyFor @Initialized Instant stopReadTime;
            @VisibleForTesting
            final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> consumerConfig;
            @VisibleForTesting
            final @Nullable @UnknownKeyFor @Initialized List<@UnknownKeyFor @NonNull @Initialized String> topics;
            private final @Nullable @UnknownKeyFor @Initialized Pattern topicPattern;
            private final @Nullable @UnknownKeyFor @Initialized Boolean logTopicVerification;

            GenerateKafkaSourceDescriptor(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized Read<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> read) {
                this.consumerConfig = read.getConsumerConfig();
                this.consumerFactoryFn = read.getConsumerFactoryFn();
                this.topics = read.getTopics();
                this.topicPartitions = read.getTopicPartitions();
                this.topicPattern = read.getTopicPattern();
                this.startReadTime = read.getStartReadTime();
                this.stopReadTime = read.getStopReadTime();
                this.logTopicVerification = read.getLogTopicVerification();
            }

            @DoFn.ProcessElement
            public void processElement(// Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @NonNull @Initialized DoFn.OutputReceiver<@UnknownKeyFor @NonNull @Initialized KafkaSourceDescriptor> receiver) {
                ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>((Collection)Preconditions.checkStateNotNull(this.topicPartitions));
                if (partitions.isEmpty()) {
                    try (Consumer consumer = (Consumer)this.consumerFactoryFn.apply(this.consumerConfig);){
                        List topics = (List)Preconditions.checkStateNotNull(this.topics);
                        if (topics.isEmpty()) {
                            Pattern pattern = (Pattern)Preconditions.checkStateNotNull((Object)this.topicPattern);
                            for (Map.Entry entry : consumer.listTopics().entrySet()) {
                                if (!pattern.matcher((CharSequence)entry.getKey()).matches()) continue;
                                for (PartitionInfo p : (List)entry.getValue()) {
                                    partitions.add(new TopicPartition(p.topic(), p.partition()));
                                }
                            }
                        } else {
                            for (String topic : topics) {
                                List partitionInfoList = consumer.partitionsFor(topic);
                                if (this.logTopicVerification == null || !this.logTopicVerification.booleanValue()) {
                                    org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState((partitionInfoList != null && !partitionInfoList.isEmpty() ? 1 : 0) != 0, (Object)("Could not find any partitions info for topic " + topic + ". Please check Kafka configuration and make sure that provided topics exist."));
                                } else {
                                    LOG.warn("Could not find any partitions info for topic {}. Please check Kafka configuration and make sure that the provided topics exist.", (Object)topic);
                                }
                                for (PartitionInfo p : partitionInfoList) {
                                    partitions.add(new TopicPartition(p.topic(), p.partition()));
                                }
                            }
                        }
                    }
                }
                for (TopicPartition topicPartition : partitions) {
                    receiver.output((Object)KafkaSourceDescriptor.of(topicPartition, null, this.startReadTime, null, this.stopReadTime, null));
                }
            }
        }

        static class ReadFromKafkaViaSDF<@UnknownKeyFor K, @UnknownKeyFor V>
        extends AbstractReadFromKafka<K, V> {
            ReadFromKafkaViaSDF(@UnknownKeyFor @NonNull @Initialized Read<K, V> kafkaRead, @UnknownKeyFor @NonNull @Initialized Coder<K> keyCoder, @UnknownKeyFor @NonNull @Initialized Coder<V> valueCoder) {
                super(kafkaRead, keyCoder, valueCoder, KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation.SDF);
            }

            public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>> expand(@UnknownKeyFor @NonNull @Initialized PBegin input) {
                PCollection output;
                ReadSourceDescriptors readTransform = ReadSourceDescriptors.read().withConsumerConfigOverrides(this.kafkaRead.getConsumerConfig()).withOffsetConsumerConfigOverrides(this.kafkaRead.getOffsetConsumerConfig()).withConsumerFactoryFn(this.kafkaRead.getConsumerFactoryFn()).withKeyDeserializerProviderAndCoder(this.kafkaRead.getKeyDeserializerProvider(), this.keyCoder).withValueDeserializerProviderAndCoder(this.kafkaRead.getValueDeserializerProvider(), this.valueCoder).withManualWatermarkEstimator().withTimestampPolicyFactory(this.kafkaRead.getTimestampPolicyFactory()).withCheckStopReadingFn(this.kafkaRead.getCheckStopReadingFn()).withConsumerPollingTimeout(this.kafkaRead.getConsumerPollingTimeout());
                if (this.kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
                    readTransform = readTransform.commitOffsets();
                }
                if (this.kafkaRead.getStopReadTime() != null) {
                    readTransform = readTransform.withBounded();
                }
                if (this.kafkaRead.getBadRecordErrorHandler() != null) {
                    readTransform = readTransform.withBadRecordErrorHandler(this.kafkaRead.getBadRecordErrorHandler());
                }
                if (this.kafkaRead.isRedistributed()) {
                    readTransform = readTransform.withRedistribute();
                }
                if (this.kafkaRead.isAllowDuplicates()) {
                    readTransform = readTransform.withAllowDuplicates();
                }
                if (this.kafkaRead.getRedistributeNumKeys() > 0) {
                    readTransform = readTransform.withRedistributeNumKeys(this.kafkaRead.getRedistributeNumKeys());
                }
                if (this.kafkaRead.isDynamicRead()) {
                    HashSet<String> topics = new HashSet<String>();
                    if (this.kafkaRead.getTopics() != null && this.kafkaRead.getTopics().size() > 0) {
                        topics.addAll(this.kafkaRead.getTopics());
                    }
                    if (this.kafkaRead.getTopicPartitions() != null && this.kafkaRead.getTopicPartitions().size() > 0) {
                        for (TopicPartition topicPartition : this.kafkaRead.getTopicPartitions()) {
                            topics.add(topicPartition.topic());
                        }
                    }
                    output = (PCollection)input.apply((PTransform)new WatchForKafkaTopicPartitions(this.kafkaRead.getWatchTopicPartitionDuration(), this.kafkaRead.getConsumerFactoryFn(), this.kafkaRead.getConsumerConfig(), this.kafkaRead.getCheckStopReadingFn(), topics, this.kafkaRead.getTopicPattern(), this.kafkaRead.getStartReadTime(), this.kafkaRead.getStopReadTime()));
                } else {
                    String requestedVersionString = ((StreamingOptions)input.getPipeline().getOptions().as(StreamingOptions.class)).getUpdateCompatibilityVersion();
                    output = requestedVersionString != null && TransformUpgrader.compareVersions((String)requestedVersionString, (String)"2.66.0") < 0 ? (PCollection)((PCollection)input.getPipeline().apply((PTransform)Impulse.create())).apply((PTransform)ParDo.of((DoFn)new GenerateKafkaSourceDescriptor(this.kafkaRead))) : (PCollection)((PCollection)input.getPipeline().apply((PTransform)Create.of((Object)new byte[0], (Object[])new byte[0][]).withCoder((Coder)ByteArrayCoder.of()))).apply((PTransform)ParDo.of((DoFn)new GenerateKafkaSourceDescriptor(this.kafkaRead)));
                }
                if (this.kafkaRead.isRedistributed()) {
                    PCollection pcol = ((PCollection)output.apply(readTransform)).setCoder(KafkaRecordCoder.of(this.keyCoder, this.valueCoder));
                    if (this.kafkaRead.getRedistributeNumKeys() == 0) {
                        return (PCollection)pcol.apply("Insert Redistribute", (PTransform)Redistribute.arbitrarily().withAllowDuplicates(this.kafkaRead.isAllowDuplicates()));
                    }
                    return (PCollection)pcol.apply("Insert Redistribute with Shards", (PTransform)Redistribute.arbitrarily().withAllowDuplicates(this.kafkaRead.isAllowDuplicates()).withNumBuckets(Integer.valueOf(this.kafkaRead.getRedistributeNumKeys())));
                }
                return ((PCollection)output.apply(readTransform)).setCoder(KafkaRecordCoder.of(this.keyCoder, this.valueCoder));
            }
        }

        static class ReadFromKafkaViaUnbounded<@UnknownKeyFor K, @UnknownKeyFor V>
        extends AbstractReadFromKafka<K, V> {
            ReadFromKafkaViaUnbounded(@UnknownKeyFor @NonNull @Initialized Read<K, V> kafkaRead, @UnknownKeyFor @NonNull @Initialized Coder<K> keyCoder, @UnknownKeyFor @NonNull @Initialized Coder<V> valueCoder) {
                super(kafkaRead, keyCoder, valueCoder, KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation.LEGACY);
            }

            public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>> expand(@UnknownKeyFor @NonNull @Initialized PBegin input) {
                Read.Unbounded unbounded;
                if (this.kafkaRead.getBadRecordErrorHandler() != null) {
                    LOG.warn("The Legacy implementation of Kafka Read does not support writing malformed messages to an error handler. Use the SDF implementation instead.");
                }
                Read.Unbounded transform = unbounded = org.apache.beam.sdk.io.Read.from(this.kafkaRead.toBuilder().setKeyCoder(this.keyCoder).setValueCoder(this.valueCoder).build().makeSource());
                if (this.kafkaRead.getMaxNumRecords() < Long.MAX_VALUE || this.kafkaRead.getMaxReadTime() != null) {
                    transform = unbounded.withMaxReadTime(this.kafkaRead.getMaxReadTime()).withMaxNumRecords(this.kafkaRead.getMaxNumRecords());
                }
                if (this.kafkaRead.isRedistributed()) {
                    if (this.kafkaRead.isCommitOffsetsInFinalizeEnabled() && this.kafkaRead.isAllowDuplicates()) {
                        LOG.warn("Offsets committed due to usage of commitOffsetsInFinalize() and may not capture all work processed due to use of withRedistribute() with duplicates enabled");
                    }
                    PCollection output = (PCollection)input.getPipeline().apply((PTransform)transform);
                    if (this.kafkaRead.getRedistributeNumKeys() == 0) {
                        return (PCollection)output.apply("Insert Redistribute", (PTransform)Redistribute.arbitrarily().withAllowDuplicates(this.kafkaRead.isAllowDuplicates()));
                    }
                    return (PCollection)output.apply("Insert Redistribute with Shards", (PTransform)Redistribute.arbitrarily().withAllowDuplicates(this.kafkaRead.isAllowDuplicates()).withNumBuckets(Integer.valueOf(this.kafkaRead.getRedistributeNumKeys())));
                }
                return (PCollection)input.getPipeline().apply((PTransform)transform);
            }
        }

        private static abstract class AbstractReadFromKafka<@UnknownKeyFor K, @UnknownKeyFor V>
        extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
            @UnknownKeyFor @NonNull @Initialized Read<K, V> kafkaRead;
            @UnknownKeyFor @NonNull @Initialized Coder<K> keyCoder;
            @UnknownKeyFor @NonNull @Initialized Coder<V> valueCoder;

            AbstractReadFromKafka(@UnknownKeyFor @NonNull @Initialized Read<K, V> kafkaRead, @UnknownKeyFor @NonNull @Initialized Coder<K> keyCoder, @UnknownKeyFor @NonNull @Initialized Coder<V> valueCoder, @UnknownKeyFor @NonNull @Initialized KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation implementation) {
                KafkaIOReadImplementationCompatibility.getCompatibility(kafkaRead).checkSupport(implementation);
                this.kafkaRead = kafkaRead;
                this.keyCoder = keyCoder;
                this.valueCoder = valueCoder;
            }
        }

        private static class KafkaReadOverrideFactory<@UnknownKeyFor K, @UnknownKeyFor V>
        implements PTransformOverrideFactory<PBegin, PCollection<KafkaRecord<K, V>>, ReadFromKafkaViaSDF<K, V>> {
            private KafkaReadOverrideFactory() {
            }

            public // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @NonNull @Initialized PTransformOverrideFactory.PTransformReplacement<@UnknownKeyFor @NonNull @Initialized PBegin, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>> getReplacementTransform(@UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @NonNull @Initialized PBegin, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>>, @UnknownKeyFor @NonNull @Initialized ReadFromKafkaViaSDF<K, V>> transform) {
                try {
                    return PTransformOverrideFactory.PTransformReplacement.of((PInput)transform.getPipeline().begin(), new ReadFromKafkaViaUnbounded(((ReadFromKafkaViaSDF)transform.getTransform()).kafkaRead, ((ReadFromKafkaViaSDF)transform.getTransform()).keyCoder, ((ReadFromKafkaViaSDF)transform.getTransform()).valueCoder));
                }
                catch (KafkaIOReadImplementationCompatibility.KafkaIOReadImplementationCompatibilityException e) {
                    throw new IllegalStateException("The current runner does not support SDF-based Kafka read properly and the replacement runner lacks the support for the following properties: " + e.getConflictingProperties() + ". For example if you are using Dataflow then consider using Dataflow Runner v2.");
                }
            }

            public /*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @NonNull @Initialized PTransformOverrideFactory.ReplacementOutput> mapOutputs(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> outputs, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>> newOutput) {
                return ReplacementOutputs.singleton(outputs, newOutput);
            }
        }

        public static interface FakeFlinkPipelineOptions
        extends PipelineOptions {
            @Default.Long(value=-1L)
            public @UnknownKeyFor @NonNull @Initialized Long getCheckpointingInterval();

            public void setCheckpointingInterval(@UnknownKeyFor @NonNull @Initialized Long var1);
        }

        @AutoService(value={ExternalTransformRegistrar.class})
        public static class External
        implements ExternalTransformRegistrar {
            public static final @UnknownKeyFor @NonNull @Initialized String URN_WITH_METADATA = "beam:transform:org.apache.beam:kafka_read_with_metadata:v1";
            public static final @UnknownKeyFor @NonNull @Initialized String URN_WITHOUT_METADATA = "beam:transform:org.apache.beam:kafka_read_without_metadata:v1";

            public /*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized ExternalTransformBuilder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?>>> knownBuilders() {
                return ImmutableMap.of((Object)URN_WITH_METADATA, RowsWithMetadata.Builder.class, (Object)URN_WITHOUT_METADATA, TypedWithoutMetadata.Builder.class);
            }

            public static class Configuration {
                private @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> consumerConfig;
                private @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> topics;
                private @UnknownKeyFor @NonNull @Initialized String keyDeserializer;
                private @UnknownKeyFor @NonNull @Initialized String valueDeserializer;
                private @UnknownKeyFor @NonNull @Initialized Long startReadTime;
                private @UnknownKeyFor @NonNull @Initialized Long stopReadTime;
                private @UnknownKeyFor @NonNull @Initialized Long maxNumRecords;
                private @UnknownKeyFor @NonNull @Initialized Long maxReadTime;
                private @UnknownKeyFor @NonNull @Initialized Boolean commitOffsetInFinalize;
                private @UnknownKeyFor @NonNull @Initialized Long consumerPollingTimeout;
                private @UnknownKeyFor @NonNull @Initialized String timestampPolicy;
                private @UnknownKeyFor @NonNull @Initialized Integer redistributeNumKeys;
                private @UnknownKeyFor @NonNull @Initialized Boolean redistribute;
                private @UnknownKeyFor @NonNull @Initialized Boolean allowDuplicates;
                private @UnknownKeyFor @NonNull @Initialized Boolean offsetDeduplication;
                private @UnknownKeyFor @NonNull @Initialized Long dynamicReadPollIntervalSeconds;

                public void setConsumerConfig(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> consumerConfig) {
                    this.consumerConfig = consumerConfig;
                }

                public void setTopics(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> topics) {
                    this.topics = topics;
                }

                public void setKeyDeserializer(@UnknownKeyFor @NonNull @Initialized String keyDeserializer) {
                    this.keyDeserializer = keyDeserializer;
                }

                public void setValueDeserializer(@UnknownKeyFor @NonNull @Initialized String valueDeserializer) {
                    this.valueDeserializer = valueDeserializer;
                }

                public void setStartReadTime(@UnknownKeyFor @NonNull @Initialized Long startReadTime) {
                    this.startReadTime = startReadTime;
                }

                public void setStopReadTime(@UnknownKeyFor @NonNull @Initialized Long stopReadTime) {
                    this.stopReadTime = stopReadTime;
                }

                public void setMaxNumRecords(@UnknownKeyFor @NonNull @Initialized Long maxNumRecords) {
                    this.maxNumRecords = maxNumRecords;
                }

                public void setMaxReadTime(@UnknownKeyFor @NonNull @Initialized Long maxReadTime) {
                    this.maxReadTime = maxReadTime;
                }

                public void setCommitOffsetInFinalize(@UnknownKeyFor @NonNull @Initialized Boolean commitOffsetInFinalize) {
                    this.commitOffsetInFinalize = commitOffsetInFinalize;
                }

                public void setTimestampPolicy(@UnknownKeyFor @NonNull @Initialized String timestampPolicy) {
                    this.timestampPolicy = timestampPolicy;
                }

                public void setConsumerPollingTimeout(@UnknownKeyFor @NonNull @Initialized Long consumerPollingTimeout) {
                    this.consumerPollingTimeout = consumerPollingTimeout;
                }

                public void setRedistributeNumKeys(@UnknownKeyFor @NonNull @Initialized Integer redistributeNumKeys) {
                    this.redistributeNumKeys = redistributeNumKeys;
                }

                public void setRedistribute(@UnknownKeyFor @NonNull @Initialized Boolean redistribute) {
                    this.redistribute = redistribute;
                }

                public void setAllowDuplicates(@UnknownKeyFor @NonNull @Initialized Boolean allowDuplicates) {
                    this.allowDuplicates = allowDuplicates;
                }

                public void setOffsetDeduplication(@UnknownKeyFor @NonNull @Initialized Boolean offsetDeduplication) {
                    this.offsetDeduplication = offsetDeduplication;
                }

                public void setDynamicReadPollIntervalSeconds(@UnknownKeyFor @NonNull @Initialized Long dynamicReadPollIntervalSeconds) {
                    this.dynamicReadPollIntervalSeconds = dynamicReadPollIntervalSeconds;
                }
            }
        }

        @AutoValue.Builder
        static abstract class Builder<@UnknownKeyFor K, @UnknownKeyFor V> {
            Builder() {
            }

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setConsumerConfig(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setTopics(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setTopicPartitions(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized TopicPartition> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setTopicPattern(@UnknownKeyFor @NonNull @Initialized Pattern var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setKeyCoder(@UnknownKeyFor @NonNull @Initialized Coder<K> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setValueCoder(@UnknownKeyFor @NonNull @Initialized Coder<V> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setConsumerFactoryFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object>, @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setWatermarkFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized KafkaRecord<K, V>, @UnknownKeyFor @NonNull @Initialized Instant> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setMaxNumRecords(@UnknownKeyFor @NonNull @Initialized long var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setMaxReadTime(@UnknownKeyFor @NonNull @Initialized Duration var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setStartReadTime(@UnknownKeyFor @NonNull @Initialized Instant var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setStopReadTime(@UnknownKeyFor @NonNull @Initialized Instant var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setCommitOffsetsInFinalizeEnabled(@UnknownKeyFor @NonNull @Initialized boolean var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setDynamicRead(@UnknownKeyFor @NonNull @Initialized boolean var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setWatchTopicPartitionDuration(@UnknownKeyFor @NonNull @Initialized Duration var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setRedistributed(@UnknownKeyFor @NonNull @Initialized boolean var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setAllowDuplicates(@UnknownKeyFor @NonNull @Initialized boolean var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setRedistributeNumKeys(@UnknownKeyFor @NonNull @Initialized int var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setOffsetDeduplication(@UnknownKeyFor @NonNull @Initialized Boolean var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setTimestampPolicyFactory(@UnknownKeyFor @NonNull @Initialized TimestampPolicyFactory<K, V> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setOffsetConsumerConfig(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setKeyDeserializerProvider(@UnknownKeyFor @NonNull @Initialized DeserializerProvider<K> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setValueDeserializerProvider(@UnknownKeyFor @NonNull @Initialized DeserializerProvider<V> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setCheckStopReadingFn(@Nullable @UnknownKeyFor @Initialized CheckStopReadingFn var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setBadRecordErrorHandler(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            @Nullable @UnknownKeyFor @Initialized ErrorHandler<@UnknownKeyFor @NonNull @Initialized BadRecord, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> var1);

            @UnknownKeyFor @NonNull @Initialized Builder<K, V> setCheckStopReadingFn(@Nullable @UnknownKeyFor @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized TopicPartition, @UnknownKeyFor @NonNull @Initialized Boolean> checkStopReadingFn) {
                return this.setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn));
            }

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setConsumerPollingTimeout(@UnknownKeyFor @NonNull @Initialized long var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<K, V> setLogTopicVerification(@Nullable @UnknownKeyFor @Initialized Boolean var1);

            abstract @UnknownKeyFor @NonNull @Initialized Read<K, V> build();

            static <K, V> void setupExternalBuilder(@UnknownKeyFor @NonNull @Initialized Builder<K, V> builder, @UnknownKeyFor @NonNull @Initialized External.Configuration config) {
                ImmutableList.Builder listBuilder = ImmutableList.builder();
                for (String topic : config.topics) {
                    listBuilder.add((Object)topic);
                }
                builder.setTopics((List<String>)listBuilder.build());
                Class keyDeserializer = KafkaIO.resolveClass(config.keyDeserializer);
                builder.setKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
                builder.setKeyCoder(Builder.resolveCoder(keyDeserializer));
                Class valueDeserializer = KafkaIO.resolveClass(config.valueDeserializer);
                builder.setValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
                builder.setValueCoder(Builder.resolveCoder(valueDeserializer));
                HashMap<String, Object> consumerConfig = new HashMap<String, Object>(config.consumerConfig);
                consumerConfig.put("key.deserializer", keyDeserializer.getName());
                consumerConfig.put("value.deserializer", valueDeserializer.getName());
                builder.setConsumerConfig(consumerConfig);
                builder.setTopicPartitions(Collections.emptyList());
                builder.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN);
                if (config.maxReadTime != null) {
                    builder.setMaxReadTime(Duration.standardSeconds((long)config.maxReadTime));
                }
                builder.setMaxNumRecords(config.maxNumRecords == null ? Long.MAX_VALUE : config.maxNumRecords);
                builder.setCommitOffsetsInFinalizeEnabled(config.commitOffsetInFinalize);
                String timestampPolicy = config.timestampPolicy;
                if (timestampPolicy.equals("ProcessingTime")) {
                    builder.setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
                } else if (timestampPolicy.equals("CreateTime")) {
                    builder.setTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(Duration.ZERO));
                } else if (timestampPolicy.equals("LogAppendTime")) {
                    builder.setTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
                } else {
                    throw new IllegalArgumentException("timestampPolicy should be one of (ProcessingTime, CreateTime, LogAppendTime)");
                }
                if (config.startReadTime != null) {
                    builder.setStartReadTime(Instant.ofEpochMilli((long)config.startReadTime));
                }
                if (config.stopReadTime != null) {
                    builder.setStopReadTime(Instant.ofEpochMilli((long)config.stopReadTime));
                }
                if (config.dynamicReadPollIntervalSeconds != null) {
                    builder.setDynamicRead(true);
                    builder.setWatchTopicPartitionDuration(Duration.standardSeconds((long)config.dynamicReadPollIntervalSeconds));
                } else {
                    builder.setDynamicRead(false);
                }
                if (config.consumerPollingTimeout != null) {
                    if (config.consumerPollingTimeout <= 0L) {
                        throw new IllegalArgumentException("consumerPollingTimeout should be > 0.");
                    }
                    builder.setConsumerPollingTimeout(config.consumerPollingTimeout);
                } else {
                    builder.setConsumerPollingTimeout(2L);
                }
                if (config.redistribute != null) {
                    builder.setRedistributed(config.redistribute);
                    if (config.redistributeNumKeys != null) {
                        builder.setRedistributeNumKeys(config.redistributeNumKeys);
                    }
                    if (config.allowDuplicates != null) {
                        builder.setAllowDuplicates(config.allowDuplicates);
                    }
                    if (config.redistribute.booleanValue() && (config.allowDuplicates == null || !config.allowDuplicates.booleanValue()) && config.offsetDeduplication != null) {
                        builder.setOffsetDeduplication(config.offsetDeduplication);
                    }
                } else {
                    builder.setRedistributed(false);
                    builder.setRedistributeNumKeys(0);
                    builder.setAllowDuplicates(false);
                    builder.setOffsetDeduplication(false);
                }
            }

            private static <T> @UnknownKeyFor @NonNull @Initialized Coder<T> resolveCoder(@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized Deserializer<T>> deserializer) {
                for (Method method : deserializer.getDeclaredMethods()) {
                    Class<?> returnType;
                    if (!method.getName().equals("deserialize") || (returnType = method.getReturnType()).equals(Object.class)) continue;
                    if (returnType.equals(byte[].class)) {
                        return NullableCoder.of((Coder)ByteArrayCoder.of());
                    }
                    if (returnType.equals(Integer.class)) {
                        return NullableCoder.of((Coder)VarIntCoder.of());
                    }
                    if (returnType.equals(Long.class)) {
                        return NullableCoder.of((Coder)VarLongCoder.of());
                    }
                    throw new RuntimeException("Couldn't infer Coder from " + deserializer);
                }
                throw new RuntimeException("Couldn't resolve coder for Deserializer: " + deserializer);
            }
        }
    }
}

