/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import com.google.auto.value.AutoValue;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write;
import org.apache.beam.sdk.io.kafka.ConsumerSpEL;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark;
import org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaWriter;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.Joiner;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class KafkaIO {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);

    public static Read<byte[], byte[]> readBytes() {
        return new AutoValue_KafkaIO_Read.Builder().setTopics(new ArrayList<String>()).setTopicPartitions(new ArrayList<TopicPartition>()).setKeyDeserializer(ByteArrayDeserializer.class).setValueDeserializer(ByteArrayDeserializer.class).setConsumerFactoryFn((SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>)Read.KAFKA_CONSUMER_FACTORY_FN).setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES).setMaxNumRecords(Long.MAX_VALUE).build();
    }

    public static <K, V> Read<K, V> read() {
        return new AutoValue_KafkaIO_Read.Builder().setTopics(new ArrayList<String>()).setTopicPartitions(new ArrayList<TopicPartition>()).setConsumerFactoryFn((SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>)Read.KAFKA_CONSUMER_FACTORY_FN).setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES).setMaxNumRecords(Long.MAX_VALUE).setCommitOffsetsInFinalizeEnabled(false).setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()).build();
    }

    public static <K, V> Write<K, V> write() {
        return new AutoValue_KafkaIO_Write.Builder().setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES).setEOS(false).setNumShards(0).setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN).build();
    }

    private static Map<String, Object> updateKafkaProperties(Map<String, Object> currentConfig, Map<String, String> ignoredProperties, Map<String, Object> updates) {
        for (String key : updates.keySet()) {
            Preconditions.checkArgument(!ignoredProperties.containsKey(key), "No need to configure '%s'. %s", (Object)key, (Object)ignoredProperties.get(key));
        }
        HashMap<String, Object> config = new HashMap<String, Object>(currentConfig);
        config.putAll(updates);
        return config;
    }

    private KafkaIO() {
    }

    @VisibleForTesting
    static <T> NullableCoder<T> inferCoder(CoderRegistry coderRegistry, Class<? extends Deserializer<T>> deserializer) {
        Preconditions.checkNotNull(deserializer);
        for (Type type : deserializer.getGenericInterfaces()) {
            ParameterizedType parameterizedType;
            if (!(type instanceof ParameterizedType) || (parameterizedType = (ParameterizedType)type).getRawType() != Deserializer.class) continue;
            Type parameter = parameterizedType.getActualTypeArguments()[0];
            Class clazz = (Class)parameter;
            try {
                return NullableCoder.of((Coder)coderRegistry.getCoder(clazz));
            }
            catch (CannotProvideCoderException e) {
                throw new RuntimeException(String.format("Unable to automatically infer a Coder for the Kafka Deserializer %s: no coder registered for type %s", deserializer, clazz));
            }
        }
        throw new RuntimeException(String.format("Could not extract the Kafka Deserializer type from %s", deserializer));
    }

    private static class NullOnlyCoder<T>
    extends AtomicCoder<T> {
        private NullOnlyCoder() {
        }

        public void encode(T value, OutputStream outStream) {
            Preconditions.checkArgument(value == null, "Can only encode nulls");
        }

        public T decode(InputStream inStream) {
            return null;
        }
    }

    private static class KafkaValueWrite<K, V>
    extends PTransform<PCollection<V>, PDone> {
        private final Write<K, V> kvWriteTransform;

        private KafkaValueWrite(Write<K, V> kvWriteTransform) {
            this.kvWriteTransform = kvWriteTransform;
        }

        public PDone expand(PCollection<V> input) {
            return (PDone)((PCollection)input.apply("Kafka values with default key", (PTransform)MapElements.via((SimpleFunction)new SimpleFunction<V, KV<K, V>>(){

                public KV<K, V> apply(V element) {
                    return KV.of(null, element);
                }
            }))).setCoder((Coder)KvCoder.of(new NullOnlyCoder(), (Coder)input.getCoder())).apply(this.kvWriteTransform);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.kvWriteTransform.populateDisplayData(builder);
        }
    }

    @AutoValue
    public static abstract class Write<K, V>
    extends PTransform<PCollection<KV<K, V>>, PDone> {
        private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES = ImmutableMap.of("retries", 3);
        private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of("key.serializer", "Use withKeySerializer instead", "value.serializer", "Use withValueSerializer instead");

        @Nullable
        abstract String getTopic();

        abstract Map<String, Object> getProducerConfig();

        @Nullable
        abstract SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn();

        @Nullable
        abstract Class<? extends Serializer<K>> getKeySerializer();

        @Nullable
        abstract Class<? extends Serializer<V>> getValueSerializer();

        abstract boolean isEOS();

        @Nullable
        abstract String getSinkGroupId();

        abstract int getNumShards();

        @Nullable
        abstract SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> getConsumerFactoryFn();

        abstract Builder<K, V> toBuilder();

        public Write<K, V> withBootstrapServers(String bootstrapServers) {
            return this.updateProducerProperties(ImmutableMap.of("bootstrap.servers", bootstrapServers));
        }

        public Write<K, V> withTopic(String topic) {
            return this.toBuilder().setTopic(topic).build();
        }

        public Write<K, V> withKeySerializer(Class<? extends Serializer<K>> keySerializer) {
            return this.toBuilder().setKeySerializer(keySerializer).build();
        }

        public Write<K, V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) {
            return this.toBuilder().setValueSerializer(valueSerializer).build();
        }

        public Write<K, V> updateProducerProperties(Map<String, Object> configUpdates) {
            Map config = KafkaIO.updateKafkaProperties(this.getProducerConfig(), Write.IGNORED_PRODUCER_PROPERTIES, configUpdates);
            return this.toBuilder().setProducerConfig(config).build();
        }

        public Write<K, V> withProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) {
            return this.toBuilder().setProducerFactoryFn(producerFactoryFn).build();
        }

        public Write<K, V> withEOS(int numShards, String sinkGroupId) {
            KafkaExactlyOnceSink.ensureEOSSupport();
            Preconditions.checkArgument(numShards >= 1, "numShards should be >= 1");
            Preconditions.checkArgument(sinkGroupId != null, "sinkGroupId is required for exactly-once sink");
            return this.toBuilder().setEOS(true).setNumShards(numShards).setSinkGroupId(sinkGroupId).build();
        }

        public Write<K, V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) {
            return this.toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
        }

        public PTransform<PCollection<V>, PDone> values() {
            return new KafkaValueWrite(this.toBuilder().setKeySerializer(StringSerializer.class).build());
        }

        public PDone expand(PCollection<KV<K, V>> input) {
            Preconditions.checkArgument(this.getProducerConfig().get("bootstrap.servers") != null, "withBootstrapServers() is required");
            Preconditions.checkArgument(this.getTopic() != null, "withTopic() is required");
            Preconditions.checkArgument(this.getKeySerializer() != null, "withKeySerializer() is required");
            Preconditions.checkArgument(this.getValueSerializer() != null, "withValueSerializer() is required");
            if (this.isEOS()) {
                KafkaExactlyOnceSink.ensureEOSSupport();
                input.apply(new KafkaExactlyOnceSink(this));
            } else {
                input.apply((PTransform)ParDo.of(new KafkaWriter(this)));
            }
            return PDone.in((Pipeline)input.getPipeline());
        }

        public void validate(PipelineOptions options) {
            if (this.isEOS()) {
                String runner = options.getRunner().getName();
                if (runner.equals("org.apache.beam.runners.direct.DirectRunner") || runner.startsWith("org.apache.beam.runners.dataflow.") || runner.startsWith("org.apache.beam.runners.spark.")) {
                    return;
                }
                throw new UnsupportedOperationException(runner + " is not whitelisted among runners compatible with Kafka exactly-once sink. This implementation of exactly-once sink relies on specific checkpoint guarantees. Only the runners with known to have compatible checkpoint semantics are whitelisted.");
            }
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item((String)"topic", (String)this.getTopic()).withLabel("Topic"));
            Set<String> ignoredProducerPropertiesKeys = IGNORED_PRODUCER_PROPERTIES.keySet();
            for (Map.Entry<String, Object> conf : this.getProducerConfig().entrySet()) {
                String key = conf.getKey();
                if (ignoredProducerPropertiesKeys.contains(key)) continue;
                Object value = DisplayData.inferType((Object)conf.getValue()) != null ? conf.getValue() : String.valueOf(conf.getValue());
                builder.add(DisplayData.item((String)key, (ValueProvider)ValueProvider.StaticValueProvider.of((Object)value)));
            }
        }

        @AutoValue.Builder
        static abstract class Builder<K, V> {
            Builder() {
            }

            abstract Builder<K, V> setTopic(String var1);

            abstract Builder<K, V> setProducerConfig(Map<String, Object> var1);

            abstract Builder<K, V> setProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> var1);

            abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>> var1);

            abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> var1);

            abstract Builder<K, V> setEOS(boolean var1);

            abstract Builder<K, V> setSinkGroupId(String var1);

            abstract Builder<K, V> setNumShards(int var1);

            abstract Builder<K, V> setConsumerFactoryFn(SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> var1);

            abstract Write<K, V> build();
        }
    }

    public static class TypedWithoutMetadata<K, V>
    extends PTransform<PBegin, PCollection<KV<K, V>>> {
        private final Read<K, V> read;

        TypedWithoutMetadata(Read<K, V> read) {
            super("KafkaIO.Read");
            this.read = read;
        }

        public PCollection<KV<K, V>> expand(PBegin begin) {
            return (PCollection)((PCollection)begin.apply(this.read)).apply("Remove Kafka Metadata", (PTransform)ParDo.of((DoFn)new DoFn<KafkaRecord<K, V>, KV<K, V>>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext ctx) {
                    ctx.output(((KafkaRecord)ctx.element()).getKV());
                }
            }));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.read.populateDisplayData(builder);
        }
    }

    @AutoValue
    public static abstract class Read<K, V>
    extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
        private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of("key.deserializer", "Set keyDeserializer instead", "value.deserializer", "Set valueDeserializer instead");
        private static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES = ImmutableMap.of("key.deserializer", ByteArrayDeserializer.class.getName(), "value.deserializer", ByteArrayDeserializer.class.getName(), "receive.buffer.bytes", 524288, "auto.offset.reset", "latest", "enable.auto.commit", false);
        private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> KAFKA_CONSUMER_FACTORY_FN = KafkaConsumer::new;

        abstract Map<String, Object> getConsumerConfig();

        abstract List<String> getTopics();

        abstract List<TopicPartition> getTopicPartitions();

        @Nullable
        abstract Coder<K> getKeyCoder();

        @Nullable
        abstract Coder<V> getValueCoder();

        @Nullable
        abstract Class<? extends Deserializer<K>> getKeyDeserializer();

        @Nullable
        abstract Class<? extends Deserializer<V>> getValueDeserializer();

        abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> getConsumerFactoryFn();

        @Nullable
        abstract SerializableFunction<KafkaRecord<K, V>, Instant> getWatermarkFn();

        abstract long getMaxNumRecords();

        @Nullable
        abstract Duration getMaxReadTime();

        @Nullable
        abstract Instant getStartReadTime();

        abstract boolean isCommitOffsetsInFinalizeEnabled();

        abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();

        abstract Builder<K, V> toBuilder();

        public Read<K, V> withBootstrapServers(String bootstrapServers) {
            return this.updateConsumerProperties(ImmutableMap.of("bootstrap.servers", bootstrapServers));
        }

        public Read<K, V> withTopic(String topic) {
            return this.withTopics(ImmutableList.of(topic));
        }

        public Read<K, V> withTopics(List<String> topics) {
            Preconditions.checkState(this.getTopicPartitions().isEmpty(), "Only topics or topicPartitions can be set, not both");
            return this.toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
        }

        public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
            Preconditions.checkState(this.getTopics().isEmpty(), "Only topics or topicPartitions can be set, not both");
            return this.toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
        }

        public Read<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer) {
            return this.toBuilder().setKeyDeserializer(keyDeserializer).build();
        }

        public Read<K, V> withKeyDeserializerAndCoder(Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
            return this.toBuilder().setKeyDeserializer(keyDeserializer).setKeyCoder(keyCoder).build();
        }

        public Read<K, V> withValueDeserializer(Class<? extends Deserializer<V>> valueDeserializer) {
            return this.toBuilder().setValueDeserializer(valueDeserializer).build();
        }

        public Read<K, V> withValueDeserializerAndCoder(Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) {
            return this.toBuilder().setValueDeserializer(valueDeserializer).setValueCoder(valueCoder).build();
        }

        public Read<K, V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) {
            return this.toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
        }

        public Read<K, V> updateConsumerProperties(Map<String, Object> configUpdates) {
            Map config = KafkaIO.updateKafkaProperties(this.getConsumerConfig(), Read.IGNORED_CONSUMER_PROPERTIES, configUpdates);
            return this.toBuilder().setConsumerConfig(config).build();
        }

        public Read<K, V> withMaxNumRecords(long maxNumRecords) {
            return this.toBuilder().setMaxNumRecords(maxNumRecords).build();
        }

        public Read<K, V> withStartReadTime(Instant startReadTime) {
            return this.toBuilder().setStartReadTime(startReadTime).build();
        }

        public Read<K, V> withMaxReadTime(Duration maxReadTime) {
            return this.toBuilder().setMaxReadTime(maxReadTime).build();
        }

        public Read<K, V> withLogAppendTime() {
            return this.withTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
        }

        public Read<K, V> withProcessingTime() {
            return this.withTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
        }

        public Read<K, V> withTimestampPolicyFactory(TimestampPolicyFactory<K, V> timestampPolicyFactory) {
            return this.toBuilder().setTimestampPolicyFactory(timestampPolicyFactory).build();
        }

        @Deprecated
        public Read<K, V> withTimestampFn2(SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
            Preconditions.checkArgument(timestampFn != null, "timestampFn can not be null");
            return this.toBuilder().setTimestampPolicyFactory(TimestampPolicyFactory.withTimestampFn(timestampFn)).build();
        }

        @Deprecated
        public Read<K, V> withWatermarkFn2(SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) {
            Preconditions.checkArgument(watermarkFn != null, "watermarkFn can not be null");
            return this.toBuilder().setWatermarkFn(watermarkFn).build();
        }

        @Deprecated
        public Read<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) {
            Preconditions.checkArgument(timestampFn != null, "timestampFn can not be null");
            return this.withTimestampFn2(Read.unwrapKafkaAndThen(timestampFn));
        }

        @Deprecated
        public Read<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) {
            Preconditions.checkArgument(watermarkFn != null, "watermarkFn can not be null");
            return this.withWatermarkFn2(Read.unwrapKafkaAndThen(watermarkFn));
        }

        public Read<K, V> withReadCommitted() {
            return this.updateConsumerProperties(ImmutableMap.of("isolation.level", "read_committed"));
        }

        public Read<K, V> commitOffsetsInFinalize() {
            return this.toBuilder().setCommitOffsetsInFinalizeEnabled(true).build();
        }

        public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
            return new TypedWithoutMetadata(this);
        }

        public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
            Read.Unbounded unbounded;
            Preconditions.checkArgument(this.getConsumerConfig().get("bootstrap.servers") != null, "withBootstrapServers() is required");
            Preconditions.checkArgument(this.getTopics().size() > 0 || this.getTopicPartitions().size() > 0, "Either withTopic(), withTopics() or withTopicPartitions() is required");
            Preconditions.checkArgument(this.getKeyDeserializer() != null, "withKeyDeserializer() is required");
            Preconditions.checkArgument(this.getValueDeserializer() != null, "withValueDeserializer() is required");
            ConsumerSpEL consumerSpEL = new ConsumerSpEL();
            if (!consumerSpEL.hasOffsetsForTimes()) {
                LOG.warn("Kafka client version {} is too old. Versions before 0.10.1.0 are deprecated and may not be supported in next release of Apache Beam. Please upgrade your Kafka client version.", (Object)AppInfoParser.getVersion());
            }
            if (this.getStartReadTime() != null) {
                Preconditions.checkArgument(consumerSpEL.hasOffsetsForTimes(), "Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, current version of Kafka Client is " + AppInfoParser.getVersion() + ". If you are building with maven, set \"kafka.clients.version\" maven property to 0.10.1.0 or newer.");
            }
            if (this.isCommitOffsetsInFinalizeEnabled()) {
                Preconditions.checkArgument(this.getConsumerConfig().get("group.id") != null, "commitOffsetsInFinalize() is enabled, but group.id in Kafka consumer config is not set. Offset management requires group.id.");
                if (Boolean.TRUE.equals(this.getConsumerConfig().get("enable.auto.commit"))) {
                    LOG.warn("'{}' in consumer config is enabled even though commitOffsetsInFinalize() is set. You need only one of them.", (Object)"enable.auto.commit");
                }
            }
            CoderRegistry registry = input.getPipeline().getCoderRegistry();
            Coder<K> keyCoder = this.getKeyCoder() != null ? this.getKeyCoder() : KafkaIO.inferCoder(registry, this.getKeyDeserializer());
            Preconditions.checkArgument(keyCoder != null, "Key coder could not be inferred from key deserializer. Please providekey coder explicitly using withKeyDeserializerAndCoder()");
            Coder<V> valueCoder = this.getValueCoder() != null ? this.getValueCoder() : KafkaIO.inferCoder(registry, this.getValueDeserializer());
            Preconditions.checkArgument(valueCoder != null, "Value coder could not be inferred from value deserializer. Please providevalue coder explicitly using withValueDeserializerAndCoder()");
            Read.Unbounded transform = unbounded = org.apache.beam.sdk.io.Read.from(this.toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
            if (this.getMaxNumRecords() < Long.MAX_VALUE || this.getMaxReadTime() != null) {
                transform = unbounded.withMaxReadTime(this.getMaxReadTime()).withMaxNumRecords(this.getMaxNumRecords());
            }
            return (PCollection)input.getPipeline().apply((PTransform)transform);
        }

        @VisibleForTesting
        UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
            return new KafkaUnboundedSource(this, -1);
        }

        private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT> unwrapKafkaAndThen(SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {
            return (SerializableFunction & Serializable)record -> fn.apply(record.getKV());
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            List<String> topics = this.getTopics();
            List<TopicPartition> topicPartitions = this.getTopicPartitions();
            if (topics.size() > 0) {
                builder.add(DisplayData.item((String)"topics", (String)Joiner.on(",").join(topics)).withLabel("Topic/s"));
            } else if (topicPartitions.size() > 0) {
                builder.add(DisplayData.item((String)"topicPartitions", (String)Joiner.on(",").join(topicPartitions)).withLabel("Topic Partition/s"));
            }
            Set<String> ignoredConsumerPropertiesKeys = IGNORED_CONSUMER_PROPERTIES.keySet();
            for (Map.Entry<String, Object> conf : this.getConsumerConfig().entrySet()) {
                String key = conf.getKey();
                if (ignoredConsumerPropertiesKeys.contains(key)) continue;
                Object value = DisplayData.inferType((Object)conf.getValue()) != null ? conf.getValue() : String.valueOf(conf.getValue());
                builder.add(DisplayData.item((String)key, (ValueProvider)ValueProvider.StaticValueProvider.of((Object)value)));
            }
        }

        @AutoValue.Builder
        static abstract class Builder<K, V> {
            Builder() {
            }

            abstract Builder<K, V> setConsumerConfig(Map<String, Object> var1);

            abstract Builder<K, V> setTopics(List<String> var1);

            abstract Builder<K, V> setTopicPartitions(List<TopicPartition> var1);

            abstract Builder<K, V> setKeyCoder(Coder<K> var1);

            abstract Builder<K, V> setValueCoder(Coder<V> var1);

            abstract Builder<K, V> setKeyDeserializer(Class<? extends Deserializer<K>> var1);

            abstract Builder<K, V> setValueDeserializer(Class<? extends Deserializer<V>> var1);

            abstract Builder<K, V> setConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> var1);

            abstract Builder<K, V> setWatermarkFn(SerializableFunction<KafkaRecord<K, V>, Instant> var1);

            abstract Builder<K, V> setMaxNumRecords(long var1);

            abstract Builder<K, V> setMaxReadTime(Duration var1);

            abstract Builder<K, V> setStartReadTime(Instant var1);

            abstract Builder<K, V> setCommitOffsetsInFinalizeEnabled(boolean var1);

            abstract Builder<K, V> setTimestampPolicyFactory(TimestampPolicyFactory<K, V> var1);

            abstract Read<K, V> build();
        }
    }
}

