/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaRecordCoder;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.Joiner;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.Preconditions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaUnboundedSource<K, V>
extends UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedSource.class);
    private final KafkaIO.Read<K, V> spec;
    private final int id;

    public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOptions options) throws Exception {
        int i;
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>(this.spec.getTopicPartitions());
        if (partitions.isEmpty()) {
            try (Consumer consumer = (Consumer)this.spec.getConsumerFactoryFn().apply(this.spec.getConsumerConfig());){
                for (String topic : this.spec.getTopics()) {
                    for (PartitionInfo p : consumer.partitionsFor(topic)) {
                        partitions.add(new TopicPartition(p.topic(), p.partition()));
                    }
                }
            }
        }
        partitions.sort(Comparator.comparing(TopicPartition::topic).thenComparing(Comparator.comparingInt(TopicPartition::partition)));
        Preconditions.checkArgument(desiredNumSplits > 0);
        Preconditions.checkState(partitions.size() > 0, "Could not find any partitions. Please check Kafka configuration and topic names");
        int numSplits = Math.min(desiredNumSplits, partitions.size());
        ArrayList assignments = new ArrayList(numSplits);
        for (i = 0; i < numSplits; ++i) {
            assignments.add(new ArrayList());
        }
        for (i = 0; i < partitions.size(); ++i) {
            ((List)assignments.get(i % numSplits)).add(partitions.get(i));
        }
        ArrayList<KafkaUnboundedSource<K, V>> result = new ArrayList<KafkaUnboundedSource<K, V>>(numSplits);
        for (int i2 = 0; i2 < numSplits; ++i2) {
            List assignedToSplit = (List)assignments.get(i2);
            LOG.info("Partitions assigned to split {} (total {}): {}", new Object[]{i2, assignedToSplit.size(), Joiner.on(",").join(assignedToSplit)});
            result.add(new KafkaUnboundedSource<K, V>(this.spec.toBuilder().setTopics(Collections.emptyList()).setTopicPartitions(assignedToSplit).build(), i2));
        }
        return result;
    }

    public KafkaUnboundedReader<K, V> createReader(PipelineOptions options, KafkaCheckpointMark checkpointMark) {
        if (this.spec.getTopicPartitions().isEmpty()) {
            LOG.warn("Looks like generateSplits() is not called. Generate single split.");
            try {
                return new KafkaUnboundedReader<K, V>(this.split(1, options).get(0), checkpointMark);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        return new KafkaUnboundedReader(this, checkpointMark);
    }

    public Coder<KafkaCheckpointMark> getCheckpointMarkCoder() {
        return AvroCoder.of(KafkaCheckpointMark.class);
    }

    public boolean requiresDeduping() {
        return false;
    }

    public Coder<KafkaRecord<K, V>> getOutputCoder() {
        return KafkaRecordCoder.of(this.spec.getKeyCoder(), this.spec.getValueCoder());
    }

    public KafkaUnboundedSource(KafkaIO.Read<K, V> spec, int id) {
        this.spec = spec;
        this.id = id;
    }

    KafkaIO.Read<K, V> getSpec() {
        return this.spec;
    }

    int getId() {
        return this.id;
    }
}

