/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.InvalidTableException;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.ProducerRecordCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BeamKafkaTable
extends SchemaBaseBeamTable {
    private @UnknownKeyFor @NonNull @Initialized String bootstrapServers;
    private @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> topics;
    private @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized TopicPartition> topicPartitions;
    private @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> configUpdates;
    private @UnknownKeyFor @NonNull @Initialized BeamTableStatistics rowCountStatistics = null;
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(BeamKafkaTable.class);
    protected @UnknownKeyFor @NonNull @Initialized int numberOfRecordsForRate = 50;

    protected BeamKafkaTable(@UnknownKeyFor @NonNull @Initialized Schema beamSchema) {
        super(beamSchema);
    }

    public BeamKafkaTable(@UnknownKeyFor @NonNull @Initialized Schema beamSchema, @UnknownKeyFor @NonNull @Initialized String bootstrapServers, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> topics) {
        super(beamSchema);
        this.bootstrapServers = bootstrapServers;
        this.topics = topics;
        this.configUpdates = new HashMap<String, Object>();
    }

    public BeamKafkaTable(@UnknownKeyFor @NonNull @Initialized Schema beamSchema, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized TopicPartition> topicPartitions, @UnknownKeyFor @NonNull @Initialized String bootstrapServers) {
        super(beamSchema);
        this.bootstrapServers = bootstrapServers;
        this.topicPartitions = topicPartitions;
    }

    public @UnknownKeyFor @NonNull @Initialized BeamKafkaTable updateConsumerProperties(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> configUpdates) {
        this.configUpdates = configUpdates;
        return this;
    }

    @Override
    public // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized PCollection.IsBounded isBounded() {
        return PCollection.IsBounded.UNBOUNDED;
    }

    protected abstract @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KafkaRecord<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row>> getPTransformForInput();

    protected abstract @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized ProducerRecord<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>>> getPTransformForOutput();

    @Override
    public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row> buildIOReader(@UnknownKeyFor @NonNull @Initialized PBegin begin) {
        return ((PCollection)((PCollection)begin.apply("read", this.createKafkaRead())).apply("in_format", this.getPTransformForInput())).setRowSchema(this.getSchema());
    }

    // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized KafkaIO.Read<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> createKafkaRead() {
        KafkaIO.Read kafkaRead;
        if (this.topics != null) {
            kafkaRead = KafkaIO.read().withBootstrapServers(this.bootstrapServers).withTopics(this.topics).withConsumerConfigUpdates(this.configUpdates).withKeyDeserializerAndCoder(ByteArrayDeserializer.class, (Coder)ByteArrayCoder.of()).withValueDeserializerAndCoder(ByteArrayDeserializer.class, (Coder)ByteArrayCoder.of());
        } else if (this.topicPartitions != null) {
            kafkaRead = KafkaIO.read().withBootstrapServers(this.bootstrapServers).withTopicPartitions(this.topicPartitions).withConsumerConfigUpdates(this.configUpdates).withKeyDeserializerAndCoder(ByteArrayDeserializer.class, (Coder)ByteArrayCoder.of()).withValueDeserializerAndCoder(ByteArrayDeserializer.class, (Coder)ByteArrayCoder.of());
        } else {
            throw new InvalidTableException("One of topics and topicPartitions must be configurated.");
        }
        return kafkaRead;
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized POutput buildIOWriter(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized Row> input) {
        Preconditions.checkArgument((this.topics != null && this.topics.size() == 1 ? 1 : 0) != 0, (Object)"Only one topic can be acceptable as output.");
        return ((PCollection)input.apply("out_reformat", this.getPTransformForOutput())).setCoder((Coder)ProducerRecordCoder.of((Coder)ByteArrayCoder.of(), (Coder)ByteArrayCoder.of())).apply("persistent", this.createKafkaWrite());
    }

    private // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized KafkaIO.WriteRecords<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> createKafkaWrite() {
        return KafkaIO.writeRecords().withBootstrapServers(this.bootstrapServers).withTopic(this.topics.get(0)).withKeySerializer(ByteArraySerializer.class).withValueSerializer(ByteArraySerializer.class);
    }

    public @UnknownKeyFor @NonNull @Initialized String getBootstrapServers() {
        return this.bootstrapServers;
    }

    public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> getTopics() {
        return this.topics;
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized BeamTableStatistics getTableStatistics(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        if (this.rowCountStatistics == null) {
            try {
                this.rowCountStatistics = BeamTableStatistics.createUnboundedTableStatistics(this.computeRate(this.numberOfRecordsForRate));
            }
            catch (Exception e) {
                LOG.warn("Could not get the row count for the topics " + this.getTopics(), (Throwable)e);
                this.rowCountStatistics = BeamTableStatistics.UNBOUNDED_UNKNOWN;
            }
        }
        return this.rowCountStatistics;
    }

    @UnknownKeyFor @NonNull @Initialized double computeRate(@UnknownKeyFor @NonNull @Initialized int numberOfRecords) throws @UnknownKeyFor @NonNull @Initialized NoEstimationException {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.bootstrapServers);
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        KafkaConsumer consumer = new KafkaConsumer(props);
        return this.computeRate((Consumer)consumer, numberOfRecords);
    }

    <T> @UnknownKeyFor @NonNull @Initialized double computeRate(@UnknownKeyFor @NonNull @Initialized Consumer<T, T> consumer, @UnknownKeyFor @NonNull @Initialized int numberOfRecordsToCheck) throws @UnknownKeyFor @NonNull @Initialized NoEstimationException {
        Stream<TopicPartition> c = this.getTopics().stream().map(arg_0 -> consumer.partitionsFor(arg_0)).flatMap(Collection::stream).map(parInf -> new TopicPartition(parInf.topic(), parInf.partition()));
        List topicPartitions = c.collect(Collectors.toList());
        consumer.assign(topicPartitions);
        Map offsets = consumer.endOffsets(topicPartitions);
        long nParsSeen = 0L;
        for (TopicPartition par : topicPartitions) {
            long offset = (Long)offsets.get(par);
            nParsSeen = offset == 0L ? nParsSeen : nParsSeen + 1L;
            consumer.seek(par, Math.max(0L, offset - (long)numberOfRecordsToCheck));
        }
        if (nParsSeen == 0L) {
            throw new NoEstimationException("There is no partition with messages in it.");
        }
        ConsumerRecords records = consumer.poll(1000L);
        HashMap<Integer, Long> minTimeStamps = new HashMap<Integer, Long>();
        long maxMinTimeStamp = 0L;
        for (ConsumerRecord record : records) {
            if (minTimeStamps.containsKey(record.partition())) continue;
            minTimeStamps.put(record.partition(), record.timestamp());
            maxMinTimeStamp = Math.max(record.timestamp(), maxMinTimeStamp);
            if (--nParsSeen != 0L) continue;
            break;
        }
        int numberOfRecords = 0;
        long maxTimeStamp = 0L;
        for (ConsumerRecord record : records) {
            maxTimeStamp = Math.max(maxTimeStamp, record.timestamp());
            numberOfRecords = record.timestamp() > maxMinTimeStamp ? numberOfRecords + 1 : numberOfRecords;
        }
        if (maxTimeStamp == maxMinTimeStamp) {
            throw new NoEstimationException("Arrival time of all records are the same.");
        }
        return (double)numberOfRecords * 1000.0 / ((double)maxTimeStamp - (double)maxMinTimeStamp);
    }

    static class NoEstimationException
    extends Exception {
        NoEstimationException(@UnknownKeyFor @NonNull @Initialized String message) {
            super(message);
        }
    }
}

