/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka.consumer;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumerHelper;
import org.eclipse.hono.util.Pair;
import org.eclipse.hono.util.Strings;

public class AsyncHandlingAutoCommitKafkaConsumer
extends HonoKafkaConsumer {
    public static final String CONFIG_HONO_OFFSETS_SKIP_RECOMMIT_PERIOD_SECONDS = "hono.offsets.skip.recommit.period.seconds";
    public static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5L);
    public static final Duration DEFAULT_OFFSETS_SKIP_RECOMMIT_PERIOD = Duration.ofMinutes(30L);
    private final long commitIntervalMillis;
    private final long skipOffsetRecommitPeriodSeconds;
    private final Map<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets> offsetsMap = new HashMap<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets>();
    private final AtomicBoolean periodicCommitInvocationInProgress = new AtomicBoolean();
    private Long periodicCommitTimerId;

    public AsyncHandlingAutoCommitKafkaConsumer(Vertx vertx, Set<String> topics, Function<KafkaConsumerRecord<String, Buffer>, Future<Void>> recordHandler, Map<String, String> consumerConfig) {
        this(new AtomicReference<AsyncHandlingAutoCommitKafkaConsumer>(), vertx, topics, null, recordHandler, consumerConfig);
    }

    public AsyncHandlingAutoCommitKafkaConsumer(Vertx vertx, Pattern topicPattern, Function<KafkaConsumerRecord<String, Buffer>, Future<Void>> recordHandler, Map<String, String> consumerConfig) {
        this(new AtomicReference<AsyncHandlingAutoCommitKafkaConsumer>(), vertx, null, topicPattern, recordHandler, consumerConfig);
    }

    private AsyncHandlingAutoCommitKafkaConsumer(AtomicReference<AsyncHandlingAutoCommitKafkaConsumer> selfRef, Vertx vertx, Set<String> topics, Pattern topicPattern, Function<KafkaConsumerRecord<String, Buffer>, Future<Void>> recordHandler, Map<String, String> consumerConfig) {
        super(vertx, topics, topicPattern, AsyncHandlingAutoCommitKafkaConsumer.mapRecordHandler(selfRef, recordHandler), AsyncHandlingAutoCommitKafkaConsumer.validateAndAdaptConsumerConfig(consumerConfig));
        selfRef.setPlain(this);
        this.commitIntervalMillis = AsyncHandlingAutoCommitKafkaConsumer.getCommitInterval(consumerConfig);
        this.skipOffsetRecommitPeriodSeconds = AsyncHandlingAutoCommitKafkaConsumer.getSkipOffsetRecommitPeriodSeconds(consumerConfig);
    }

    private static Handler<KafkaConsumerRecord<String, Buffer>> mapRecordHandler(AtomicReference<AsyncHandlingAutoCommitKafkaConsumer> selfRef, Function<KafkaConsumerRecord<String, Buffer>, Future<Void>> recordHandler) {
        return record -> {
            OffsetsQueueEntry offsetsQueueEntry = ((AsyncHandlingAutoCommitKafkaConsumer)selfRef.getPlain()).setRecordReceived((KafkaConsumerRecord<String, Buffer>)record);
            try {
                ((Future)recordHandler.apply((KafkaConsumerRecord<String, Buffer>)record)).onComplete(ar -> offsetsQueueEntry.setHandlingComplete());
            }
            catch (Exception e) {
                ((AsyncHandlingAutoCommitKafkaConsumer)selfRef.getPlain()).log.warn("error handling record [topic: {}, partition: {}, offset: {}, headers: {}]", new Object[]{record.topic(), record.partition(), record.offset(), record.headers(), e});
                offsetsQueueEntry.setHandlingComplete();
            }
        };
    }

    private static Map<String, String> validateAndAdaptConsumerConfig(Map<String, String> consumerConfig) {
        if (Strings.isNullOrEmpty((Object)consumerConfig.get("group.id"))) {
            throw new IllegalArgumentException("group.id config entry has to be set");
        }
        consumerConfig.put("enable.auto.commit", "false");
        return consumerConfig;
    }

    private static long getCommitInterval(Map<String, String> consumerConfig) {
        return Optional.ofNullable(consumerConfig.get("auto.commit.interval.ms")).map(Long::parseLong).orElse(DEFAULT_COMMIT_INTERVAL.toMillis());
    }

    private static long getSkipOffsetRecommitPeriodSeconds(Map<String, String> consumerConfig) {
        return Optional.ofNullable(consumerConfig.get(CONFIG_HONO_OFFSETS_SKIP_RECOMMIT_PERIOD_SECONDS)).map(Long::parseLong).orElse(DEFAULT_OFFSETS_SKIP_RECOMMIT_PERIOD.toSeconds());
    }

    @Override
    protected void onRecordHandlerSkippedForExpiredRecord(KafkaConsumerRecord<String, Buffer> record) {
        this.setRecordReceived(record).setHandlingComplete();
    }

    @Override
    public Future<Void> start() {
        return super.start().onComplete(v -> this.startPeriodicCommitTimer());
    }

    @Override
    public Future<Void> stop() {
        if (this.periodicCommitTimerId != null) {
            this.vertx.cancelTimer(this.periodicCommitTimerId.longValue());
        }
        return super.stop().onComplete(v -> this.clearObsoleteTopicPartitionOffsets(List.of()));
    }

    @Override
    protected void onPartitionsAssignedBlocking(Set<TopicPartition> partitionsSet) {
        Consumer wrappedConsumer = this.getKafkaConsumer().asStream().unwrap();
        this.clearObsoleteTopicPartitionOffsets(wrappedConsumer.assignment());
    }

    @Override
    protected void onPartitionsRevokedBlocking(Set<TopicPartition> partitionsSet) {
        this.commitOffsetsSync();
    }

    private void commitOffsetsSync() {
        if (Vertx.currentContext() != null) {
            throw new IllegalStateException("must be run on the polling thread");
        }
        Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> offsets = this.getOffsetsToCommit();
        if (!offsets.isEmpty()) {
            try {
                if (this.log.isTraceEnabled()) {
                    this.log.trace("commitSync; offsets: [{}]", (Object)HonoKafkaConsumerHelper.getOffsetsDebugString(offsets));
                }
                Consumer wrappedConsumer = this.getKafkaConsumer().asStream().unwrap();
                wrappedConsumer.commitSync(offsets);
                this.setCommittedOffsets(offsets);
                this.log.trace("commitSync succeeded");
            }
            catch (Exception e) {
                this.log.warn("commit failed: {}", (Object)e.toString());
            }
        } else {
            this.log.trace("skip commitSync - no offsets to commit");
        }
    }

    private void startPeriodicCommitTimer() {
        this.periodicCommitTimerId = this.vertx.setPeriodic(this.commitIntervalMillis, tid -> {
            if (!this.periodicCommitInvocationInProgress.compareAndSet(false, true)) {
                this.log.trace("periodic commit already triggered, skipping invocation");
                return;
            }
            this.runOnKafkaWorkerThread((Handler<Void>)((Handler)v -> {
                Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> offsets = this.getOffsetsToCommit();
                if (!offsets.isEmpty()) {
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("do periodic commit; offsets: [{}]", (Object)HonoKafkaConsumerHelper.getOffsetsDebugString(offsets));
                    }
                    Consumer wrappedConsumer = this.getKafkaConsumer().asStream().unwrap();
                    wrappedConsumer.commitAsync(offsets, (committedOffsets, error) -> {
                        if (error != null) {
                            this.log.info("periodic commit failed: {}", (Object)error.toString());
                        } else {
                            this.log.trace("periodic commit succeeded");
                            this.setCommittedOffsets(committedOffsets);
                        }
                    });
                } else {
                    this.log.trace("skip periodic commit - no offsets to commit");
                }
                this.periodicCommitInvocationInProgress.set(false);
            }));
        });
    }

    private synchronized OffsetsQueueEntry setRecordReceived(KafkaConsumerRecord<String, Buffer> record) {
        org.apache.kafka.common.TopicPartition topicPartition = new org.apache.kafka.common.TopicPartition(record.topic(), record.partition());
        return this.offsetsMap.computeIfAbsent(topicPartition, k -> new TopicPartitionOffsets(topicPartition)).addOffset(record.offset());
    }

    private synchronized void clearObsoleteTopicPartitionOffsets(Collection<org.apache.kafka.common.TopicPartition> currentlyAssignedPartitions) {
        Objects.requireNonNull(currentlyAssignedPartitions);
        Iterator<Map.Entry<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets>> partitionOffsetsIterator = this.offsetsMap.entrySet().iterator();
        while (partitionOffsetsIterator.hasNext()) {
            Map.Entry<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets> topicPartitionOffsetsEntry = partitionOffsetsIterator.next();
            if (currentlyAssignedPartitions.contains(topicPartitionOffsetsEntry.getKey())) continue;
            if (!topicPartitionOffsetsEntry.getValue().allCompleted()) {
                this.log.debug("partition [{}] not assigned to consumer anymore but not all entries completed yet!", (Object)topicPartitionOffsetsEntry.getKey());
            } else if (topicPartitionOffsetsEntry.getValue().needsCommit()) {
                this.log.warn("partition [{}] not assigned to consumer anymore but offset corresponding to the latest handled record hasn't been committed yet!", (Object)topicPartitionOffsetsEntry.getKey());
            } else {
                this.log.trace("partition [{}] not assigned to consumer anymore; no still outstanding offset commits there", (Object)topicPartitionOffsetsEntry.getKey());
            }
            partitionOffsetsIterator.remove();
        }
    }

    private synchronized Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> getOffsetsToCommit() {
        return this.offsetsMap.entrySet().stream().flatMap(entry -> ((TopicPartitionOffsets)entry.getValue()).getLastSequentiallyCompletedOffsetForCommit().stream().map(uncommittedOffset -> Pair.of((Object)((org.apache.kafka.common.TopicPartition)entry.getKey()), (Object)new OffsetAndMetadata(uncommittedOffset + 1L, "")))).collect(Collectors.toMap(Pair::one, Pair::two));
    }

    private synchronized void setCommittedOffsets(Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> offsets) {
        offsets.forEach((partition, offsetAndMetadata) -> Optional.ofNullable(this.offsetsMap.get(partition)).ifPresent(queue -> queue.setLastCommittedOffset(offsetAndMetadata.offset() - 1L)));
    }

    static class OffsetsQueueEntry {
        private final long offset;
        private final AtomicBoolean handlingComplete = new AtomicBoolean();

        OffsetsQueueEntry(long offset) {
            this.offset = offset;
        }

        public long getOffset() {
            return this.offset;
        }

        public void setHandlingComplete() {
            this.handlingComplete.set(true);
        }

        public boolean isHandlingComplete() {
            return this.handlingComplete.get();
        }
    }

    class TopicPartitionOffsets {
        private final org.apache.kafka.common.TopicPartition topicPartition;
        private final Deque<OffsetsQueueEntry> queue = new LinkedList<OffsetsQueueEntry>();
        private long lastSequentiallyCompletedOffset = -1L;
        private long lastCommittedOffset = -1L;
        private Instant lastCommitTime;

        TopicPartitionOffsets(org.apache.kafka.common.TopicPartition topicPartition) {
            this.topicPartition = Objects.requireNonNull(topicPartition);
        }

        public OffsetsQueueEntry addOffset(long offset) {
            this.cleanupAndUpdateLastCompletedOffset();
            OffsetsQueueEntry queueEntry = new OffsetsQueueEntry(offset);
            this.queue.add(queueEntry);
            return queueEntry;
        }

        public Optional<Long> getLastSequentiallyCompletedOffsetForCommit() {
            this.cleanupAndUpdateLastCompletedOffset();
            if (this.lastSequentiallyCompletedOffset == -1L) {
                return Optional.empty();
            }
            if (!this.queue.isEmpty()) {
                AsyncHandlingAutoCommitKafkaConsumer.this.log.trace("getOffsetsToCommit: record with offset {} to use for commit is {} entries behind last received offset {}; partition [{}]", new Object[]{this.lastSequentiallyCompletedOffset, this.queue.size(), this.queue.getLast().getOffset(), this.topicPartition});
            }
            if (this.lastSequentiallyCompletedOffset != this.lastCommittedOffset) {
                return Optional.of(this.lastSequentiallyCompletedOffset);
            }
            if (this.lastCommitTime != null && this.lastCommitTime.isBefore(Instant.now().minusSeconds(AsyncHandlingAutoCommitKafkaConsumer.this.skipOffsetRecommitPeriodSeconds))) {
                AsyncHandlingAutoCommitKafkaConsumer.this.log.trace("getOffsetsToCommit: record with offset {} will be recommitted (last commit {} too long ago); partition [{}]", new Object[]{this.lastSequentiallyCompletedOffset, this.lastCommitTime, this.topicPartition});
                return Optional.of(this.lastSequentiallyCompletedOffset);
            }
            return Optional.empty();
        }

        private void cleanupAndUpdateLastCompletedOffset() {
            while (Optional.ofNullable(this.queue.peek()).map(OffsetsQueueEntry::isHandlingComplete).orElse(false).booleanValue()) {
                this.lastSequentiallyCompletedOffset = this.queue.remove().getOffset();
            }
        }

        public void setLastCommittedOffset(long offset) {
            if (offset >= this.lastCommittedOffset) {
                this.lastCommitTime = Instant.now();
                this.lastCommittedOffset = offset;
            }
        }

        public boolean allCompleted() {
            this.cleanupAndUpdateLastCompletedOffset();
            return this.queue.isEmpty();
        }

        public boolean needsCommit() {
            this.cleanupAndUpdateLastCompletedOffset();
            return this.lastSequentiallyCompletedOffset != -1L && this.lastSequentiallyCompletedOffset != this.lastCommittedOffset;
        }
    }
}

