/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.ProducerSpEL;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.MoreObjects;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.cache.Cache;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.cache.CacheBuilder;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.cache.CacheLoader;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.cache.LoadingCache;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.cache.RemovalCause;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.Iterators;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.Lists;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaExactlyOnceSink<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<Void>> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaExactlyOnceSink.class);
    private static final String METRIC_NAMESPACE = "KafkaExactlyOnceSink";
    private final KafkaIO.Write<K, V> spec;

    static void ensureEOSSupport() {
        Preconditions.checkArgument(ProducerSpEL.supportsTransactions(), "%s %s", (Object)"This version of Kafka client does not support transactions required to support", (Object)"exactly-once semantics. Please use Kafka client version 0.11 or newer.");
    }

    KafkaExactlyOnceSink(KafkaIO.Write<K, V> spec) {
        this.spec = spec;
    }

    public PCollection<Void> expand(PCollection<KV<K, V>> input) {
        int numShards = this.spec.getNumShards();
        if (numShards <= 0) {
            try (Consumer<K, K> consumer = KafkaExactlyOnceSink.openConsumer(this.spec);){
                numShards = consumer.partitionsFor(this.spec.getTopic()).size();
                LOG.info("Using {} shards for exactly-once writer, matching number of partitions for topic '{}'", (Object)numShards, (Object)this.spec.getTopic());
            }
        }
        Preconditions.checkState(numShards > 0, "Could not set number of shards");
        return (PCollection)((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)input.apply((PTransform)Window.into((WindowFn)new GlobalWindows()).triggering((Trigger)Repeatedly.forever((Trigger)AfterPane.elementCountAtLeast((int)1))).discardingFiredPanes())).apply(String.format("Shuffle across %d shards", numShards), (PTransform)ParDo.of(new Reshard(numShards)))).apply("Persist sharding", (PTransform)GroupByKey.create())).apply("Assign sequential ids", (PTransform)ParDo.of(new Sequencer()))).apply("Persist ids", (PTransform)GroupByKey.create())).apply(String.format("Write to Kafka topic '%s'", this.spec.getTopic()), (PTransform)ParDo.of(new ExactlyOnceWriter<K, V>(this.spec, input.getCoder())));
    }

    private static Consumer<?, ?> openConsumer(KafkaIO.Write<?, ?> spec) {
        return (Consumer)spec.getConsumerFactoryFn().apply(ImmutableMap.of("bootstrap.servers", spec.getProducerConfig().get("bootstrap.servers"), "group.id", spec.getSinkGroupId(), "key.deserializer", ByteArrayDeserializer.class, "value.deserializer", ByteArrayDeserializer.class));
    }

    private static <K, V> Producer<K, V> initializeExactlyOnceProducer(KafkaIO.Write<K, V> spec, String producerName) {
        HashMap<String, Object> producerConfig = new HashMap<String, Object>(spec.getProducerConfig());
        producerConfig.putAll(ImmutableMap.of("key.serializer", spec.getKeySerializer(), "value.serializer", spec.getValueSerializer(), "enable.idempotence", true, "transactional.id", producerName));
        KafkaProducer producer = spec.getProducerFactoryFn() != null ? (Producer)spec.getProducerFactoryFn().apply(producerConfig) : new KafkaProducer(producerConfig);
        ProducerSpEL.initTransactions(producer);
        return producer;
    }

    private static class ExactlyOnceWriter<K, V>
    extends DoFn<KV<Integer, Iterable<KV<Long, KV<K, V>>>>, Void> {
        private static final String NEXT_ID = "nextId";
        private static final String MIN_BUFFERED_ID = "minBufferedId";
        private static final String OUT_OF_ORDER_BUFFER = "outOfOrderBuffer";
        private static final String WRITER_ID = "writerId";
        private static final int MAX_RECORDS_PER_TXN = 1000;
        private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
        @DoFn.StateId(value="nextId")
        private final StateSpec<ValueState<Long>> sequenceIdSpec = StateSpecs.value();
        @DoFn.StateId(value="minBufferedId")
        private final StateSpec<ValueState<Long>> minBufferedIdSpec = StateSpecs.value();
        @DoFn.StateId(value="outOfOrderBuffer")
        private final StateSpec<BagState<KV<Long, KV<K, V>>>> outOfOrderBufferSpec;
        @DoFn.StateId(value="writerId")
        private final StateSpec<ValueState<String>> writerIdSpec = StateSpecs.value();
        private final KafkaIO.Write<K, V> spec;
        private final Counter elementsWritten = SinkMetrics.elementsWritten();
        private final Counter elementsBuffered = Metrics.counter((String)"KafkaExactlyOnceSink", (String)"elementsBuffered");
        private final Counter numTransactions = Metrics.counter((String)"KafkaExactlyOnceSink", (String)"numTransactions");
        private static final LoadingCache<String, ShardWriterCache<?, ?>> CACHE_BY_GROUP_ID = CacheBuilder.newBuilder().build(new CacheLoader<String, ShardWriterCache<?, ?>>(){

            @Override
            public ShardWriterCache<?, ?> load(String key) throws Exception {
                return new ShardWriterCache();
            }
        });

        ExactlyOnceWriter(KafkaIO.Write<K, V> spec, Coder<KV<K, V>> elemCoder) {
            this.spec = spec;
            this.outOfOrderBufferSpec = StateSpecs.bag((Coder)KvCoder.of((Coder)BigEndianLongCoder.of(), elemCoder));
        }

        @DoFn.Setup
        public void setup() {
            KafkaExactlyOnceSink.ensureEOSSupport();
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.StateId(value="nextId") ValueState<Long> nextIdState, @DoFn.StateId(value="minBufferedId") ValueState<Long> minBufferedIdState, @DoFn.StateId(value="outOfOrderBuffer") BagState<KV<Long, KV<K, V>>> oooBufferState, @DoFn.StateId(value="writerId") ValueState<String> writerIdState, DoFn.ProcessContext ctx) throws IOException {
            long committedId;
            int shard = (Integer)((KV)ctx.element()).getKey();
            minBufferedIdState.readLater();
            long nextId = MoreObjects.firstNonNull(nextIdState.read(), 0L);
            long minBufferedId = MoreObjects.firstNonNull(minBufferedIdState.read(), Long.MAX_VALUE);
            ShardWriterCache<?, ?> cache = CACHE_BY_GROUP_ID.getUnchecked(this.spec.getSinkGroupId());
            ShardWriter<Object, Object> writer = cache.removeIfPresent(shard);
            if (writer == null) {
                writer = this.initShardWriter(shard, writerIdState, nextId);
            }
            if ((committedId = ((ShardWriter)writer).committedId) >= nextId) {
                LOG.info("{}: committed id {} is ahead of expected {}. {} records will be dropped (these are already written).", new Object[]{shard, committedId, nextId - 1L, committedId - nextId + 1L});
                nextId = committedId + 1L;
            }
            try {
                writer.beginTxn();
                int txnSize = 0;
                Iterator iter = ((Iterable)((KV)ctx.element()).getValue()).iterator();
                while (iter.hasNext()) {
                    KV kv = (KV)iter.next();
                    long recordId = (Long)kv.getKey();
                    if (recordId < nextId) {
                        LOG.info("{}: dropping older record {}. Already committed till {}", new Object[]{shard, recordId, committedId});
                        continue;
                    }
                    if (recordId > nextId) {
                        LOG.info("{}: Saving out of order record {}, next record id to be written is {}", new Object[]{shard, recordId, nextId});
                        oooBufferState.add((Object)kv);
                        minBufferedId = Math.min(minBufferedId, recordId);
                        minBufferedIdState.write((Object)minBufferedId);
                        this.elementsBuffered.inc();
                        continue;
                    }
                    writer.sendRecord((KV<Object, Object>)((KV)kv.getValue()), this.elementsWritten);
                    ++nextId;
                    if (++txnSize >= 1000) {
                        writer.commitTxn(recordId, this.numTransactions);
                        txnSize = 0;
                        writer.beginTxn();
                    }
                    if (minBufferedId != nextId) continue;
                    ArrayList buffered = Lists.newArrayList(oooBufferState.read());
                    buffered.sort(new KV.OrderByKey());
                    LOG.info("{} : merging {} buffered records (min buffered id is {}).", new Object[]{shard, buffered.size(), minBufferedId});
                    oooBufferState.clear();
                    minBufferedIdState.clear();
                    minBufferedId = Long.MAX_VALUE;
                    iter = Iterators.mergeSorted(ImmutableList.of(iter, buffered.iterator()), new KV.OrderByKey());
                }
                writer.commitTxn(nextId - 1L, this.numTransactions);
                nextIdState.write((Object)nextId);
            }
            catch (ProducerSpEL.UnrecoverableProducerException e) {
                LOG.warn("{} : closing producer {} after unrecoverable error. The work might have migrated. Committed id {}, current id {}.", new Object[]{((ShardWriter)writer).shard, ((ShardWriter)writer).producerName, ((ShardWriter)writer).committedId, nextId - 1L, e});
                ((ShardWriter)writer).producer.close();
                writer = null;
                throw e;
            }
            finally {
                if (writer != null) {
                    cache.insert(shard, writer);
                }
            }
        }

        private ShardWriter<K, V> initShardWriter(int shard, ValueState<String> writerIdState, long nextId) throws IOException {
            String producerName = String.format("producer_%d_for_%s", shard, this.spec.getSinkGroupId());
            Producer producer = KafkaExactlyOnceSink.initializeExactlyOnceProducer(this.spec, producerName);
            try {
                OffsetAndMetadata committed;
                String writerId = (String)writerIdState.read();
                try (Consumer consumer = KafkaExactlyOnceSink.openConsumer(this.spec);){
                    committed = consumer.committed(new TopicPartition(this.spec.getTopic(), shard));
                }
                long committedSeqId = -1L;
                if (committed == null || committed.metadata() == null || committed.metadata().isEmpty()) {
                    Preconditions.checkState(nextId == 0L && writerId == null, "State exists for shard %s (nextId %s, writerId '%s'), but there is no state stored with Kafka topic '%s' group id '%s'", shard, nextId, writerId, this.spec.getTopic(), this.spec.getSinkGroupId());
                    writerId = String.format("%X - %s", new Random().nextInt(Integer.MAX_VALUE), DateTimeFormat.forPattern((String)"yyyy-MM-dd HH:mm:ss").withZone(DateTimeZone.UTC).print(DateTimeUtils.currentTimeMillis()));
                    writerIdState.write((Object)writerId);
                    LOG.info("Assigned writer id '{}' to shard {}", (Object)writerId, (Object)shard);
                } else {
                    ShardMetadata metadata = (ShardMetadata)JSON_MAPPER.readValue(committed.metadata(), ShardMetadata.class);
                    Preconditions.checkNotNull(metadata.writerId);
                    if (writerId == null) {
                        throw new IllegalStateException(String.format("Kafka metadata exists for shard %s, but there is no stored state for it. This mostly indicates groupId '%s' is used else where or in earlier runs. Try another group id. Metadata for this shard on Kafka : '%s'", shard, this.spec.getSinkGroupId(), committed.metadata()));
                    }
                    Preconditions.checkState(writerId.equals(metadata.writerId), "Writer ids don't match. This is mostly a unintended misuse of groupId('%s').Beam '%s', Kafka '%s'", (Object)this.spec.getSinkGroupId(), (Object)writerId, (Object)metadata.writerId);
                    committedSeqId = metadata.sequenceId;
                    Preconditions.checkState(committedSeqId >= nextId - 1L, "Committed sequence id can not be lower than %s, partition metadata : %s", nextId - 1L, (Object)committed.metadata());
                }
                LOG.info("{} : initialized producer {} with committed sequence id {}", new Object[]{shard, producerName, committedSeqId});
                return new ShardWriter<K, V>(shard, writerId, producer, producerName, this.spec, committedSeqId);
            }
            catch (Exception e) {
                producer.close();
                throw e;
            }
        }

        private static class ShardWriterCache<K, V> {
            static final ScheduledExecutorService SCHEDULED_CLEAN_UP_THREAD = Executors.newSingleThreadScheduledExecutor();
            static final int CLEAN_UP_CHECK_INTERVAL_MS = 10000;
            static final int IDLE_TIMEOUT_MS = 60000;
            private final Cache<Integer, ShardWriter<K, V>> cache = CacheBuilder.newBuilder().expireAfterWrite(60000L, TimeUnit.MILLISECONDS).removalListener(notification -> {
                if (notification.getCause() != RemovalCause.EXPLICIT) {
                    ShardWriter writer = (ShardWriter)notification.getValue();
                    LOG.info("{} : Closing idle shard writer {} after 1 minute of idle time.", (Object)writer.shard, (Object)writer.producerName);
                    writer.producer.close();
                }
            }).build();

            ShardWriterCache() {
                SCHEDULED_CLEAN_UP_THREAD.scheduleAtFixedRate(this.cache::cleanUp, 10000L, 10000L, TimeUnit.MILLISECONDS);
            }

            ShardWriter<K, V> removeIfPresent(int shard) {
                return (ShardWriter)this.cache.asMap().remove(shard);
            }

            void insert(int shard, ShardWriter<K, V> writer) {
                ShardWriter<K, V> existing = this.cache.asMap().putIfAbsent(shard, writer);
                Preconditions.checkState(existing == null, "Unexpected multiple instances of writers for shard %s", shard);
            }
        }

        private static class ShardWriter<K, V> {
            private final int shard;
            private final String writerId;
            private final Producer<K, V> producer;
            private final String producerName;
            private final KafkaIO.Write<K, V> spec;
            private long committedId;

            ShardWriter(int shard, String writerId, Producer<K, V> producer, String producerName, KafkaIO.Write<K, V> spec, long committedId) {
                this.shard = shard;
                this.writerId = writerId;
                this.producer = producer;
                this.producerName = producerName;
                this.spec = spec;
                this.committedId = committedId;
            }

            void beginTxn() {
                ProducerSpEL.beginTransaction(this.producer);
            }

            void sendRecord(KV<K, V> record, Counter sendCounter) {
                try {
                    this.producer.send(new ProducerRecord(this.spec.getTopic(), record.getKey(), record.getValue()));
                    sendCounter.inc();
                }
                catch (KafkaException e) {
                    ProducerSpEL.abortTransaction(this.producer);
                    throw e;
                }
            }

            void commitTxn(long lastRecordId, Counter numTransactions) throws IOException {
                try {
                    ProducerSpEL.sendOffsetsToTransaction(this.producer, ImmutableMap.of(new TopicPartition(this.spec.getTopic(), this.shard), new OffsetAndMetadata(0L, JSON_MAPPER.writeValueAsString((Object)new ShardMetadata(lastRecordId, this.writerId)))), this.spec.getSinkGroupId());
                    ProducerSpEL.commitTransaction(this.producer);
                    numTransactions.inc();
                    LOG.debug("{} : committed {} records", (Object)this.shard, (Object)(lastRecordId - this.committedId));
                    this.committedId = lastRecordId;
                }
                catch (KafkaException e) {
                    ProducerSpEL.abortTransaction(this.producer);
                    throw e;
                }
            }
        }

        private static class ShardMetadata {
            @JsonProperty(value="seq")
            public final long sequenceId;
            @JsonProperty(value="id")
            public final String writerId;

            private ShardMetadata() {
                this.sequenceId = -1L;
                this.writerId = null;
            }

            ShardMetadata(long sequenceId, String writerId) {
                this.sequenceId = sequenceId;
                this.writerId = writerId;
            }
        }
    }

    private static class Sequencer<K, V>
    extends DoFn<KV<Integer, Iterable<KV<K, V>>>, KV<Integer, KV<Long, KV<K, V>>>> {
        private static final String NEXT_ID = "nextId";
        @DoFn.StateId(value="nextId")
        private final StateSpec<ValueState<Long>> nextIdSpec = StateSpecs.value();

        private Sequencer() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.StateId(value="nextId") ValueState<Long> nextIdState, DoFn.ProcessContext ctx) {
            long nextId = MoreObjects.firstNonNull(nextIdState.read(), 0L);
            int shard = (Integer)((KV)ctx.element()).getKey();
            for (KV value : (Iterable)((KV)ctx.element()).getValue()) {
                ctx.output((Object)KV.of((Object)shard, (Object)KV.of((Object)nextId, (Object)value)));
                ++nextId;
            }
            nextIdState.write((Object)nextId);
        }
    }

    private static class Reshard<K, V>
    extends DoFn<KV<K, V>, KV<Integer, KV<K, V>>> {
        private final int numShards;
        private transient int shardId;

        Reshard(int numShards) {
            this.numShards = numShards;
        }

        @DoFn.Setup
        public void setup() {
            this.shardId = ThreadLocalRandom.current().nextInt(this.numShards);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext ctx) {
            this.shardId = (this.shardId + 1) % this.numShards;
            ctx.output((Object)KV.of((Object)this.shardId, (Object)ctx.element()));
        }
    }
}

