/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka.consumer;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.errors.TimeoutException;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerCommitException;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerPollException;
import org.eclipse.hono.util.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractAtLeastOnceKafkaConsumer<T>
implements Lifecycle {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractAtLeastOnceKafkaConsumer.class);
    private final KafkaConsumer<String, Buffer> kafkaConsumer;
    private final Set<String> topics;
    private final Pattern topicPattern;
    private final Handler<T> messageHandler;
    private final Handler<Throwable> closeHandler;
    private final Duration pollTimeout;

    public AbstractAtLeastOnceKafkaConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, String topic, Handler<T> messageHandler, Handler<Throwable> closeHandler, long pollTimeout) {
        this(kafkaConsumer, Set.of(Objects.requireNonNull(topic)), messageHandler, closeHandler, pollTimeout);
    }

    public AbstractAtLeastOnceKafkaConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, Set<String> topics, Handler<T> messageHandler, Handler<Throwable> closeHandler, long pollTimeout) {
        this(kafkaConsumer, Objects.requireNonNull(topics), null, messageHandler, closeHandler, pollTimeout);
    }

    public AbstractAtLeastOnceKafkaConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, Pattern topicPattern, Handler<T> messageHandler, Handler<Throwable> closeHandler, long pollTimeout) {
        this(kafkaConsumer, null, Objects.requireNonNull(topicPattern), messageHandler, closeHandler, pollTimeout);
    }

    private AbstractAtLeastOnceKafkaConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, Set<String> topics, Pattern topicPattern, Handler<T> messageHandler, Handler<Throwable> closeHandler, long pollTimeout) {
        Objects.requireNonNull(kafkaConsumer);
        Objects.requireNonNull(messageHandler);
        Objects.requireNonNull(closeHandler);
        this.kafkaConsumer = kafkaConsumer;
        this.messageHandler = messageHandler;
        this.closeHandler = closeHandler;
        this.topics = topics;
        this.topicPattern = topicPattern;
        this.pollTimeout = Duration.ofMillis(pollTimeout);
    }

    protected abstract T createMessage(KafkaConsumerRecord<String, Buffer> var1);

    public Future<Void> start() {
        Promise promise = Promise.promise();
        if (this.topics != null) {
            this.kafkaConsumer.subscribe(this.topics, (Handler)promise);
        } else {
            this.kafkaConsumer.subscribe(this.topicPattern, (Handler)promise);
        }
        return promise.future().compose(v -> {
            Promise pollPromise = Promise.promise();
            this.kafkaConsumer.poll(this.pollTimeout, (Handler)pollPromise);
            return pollPromise.future().onSuccess(this::handleBatch).recover(cause -> Future.failedFuture((Throwable)new KafkaConsumerPollException((Throwable)cause))).mapEmpty();
        });
    }

    public Future<Void> stop() {
        Promise promise = Promise.promise();
        this.kafkaConsumer.close((Handler)promise);
        return promise.future();
    }

    private void handleBatch(KafkaConsumerRecords<String, Buffer> records) {
        LOG.debug("Polled {} records", (Object)records.size());
        CompositeFuture.all(this.processBatch(records)).compose(ok -> this.commit(true)).compose(ok -> this.poll()).onSuccess(this::handleBatch);
    }

    private Future<KafkaConsumerRecords<String, Buffer>> poll() {
        Promise pollPromise = Promise.promise();
        this.kafkaConsumer.poll(this.pollTimeout, (Handler)pollPromise);
        return pollPromise.future().recover(cause -> {
            LOG.error("Error polling messages: " + cause);
            KafkaConsumerPollException exception = new KafkaConsumerPollException((Throwable)cause);
            this.closeWithError(exception);
            return Future.failedFuture((Throwable)exception);
        });
    }

    private List<Future> processBatch(KafkaConsumerRecords<String, Buffer> records) {
        return IntStream.range(0, records.size()).mapToObj(arg_0 -> records.recordAt(arg_0)).map(this::processRecord).collect(Collectors.toList());
    }

    private Future<Void> processRecord(KafkaConsumerRecord<String, Buffer> record) {
        try {
            T message = this.createMessage(record);
            this.messageHandler.handle(message);
            return Future.succeededFuture();
        }
        catch (Exception ex) {
            LOG.error("Error handling record, closing the consumer: ", (Throwable)ex);
            this.commitCurrentOffset(record);
            this.stop();
            return Future.failedFuture((Throwable)ex);
        }
    }

    private Future<Void> commit(boolean retry) {
        Promise commitPromise = Promise.promise();
        this.kafkaConsumer.commit((Handler)commitPromise);
        return commitPromise.future().onSuccess(ok -> LOG.debug("Committed offsets")).recover(cause -> {
            LOG.error("Error committing offsets: " + cause);
            if (cause instanceof TimeoutException && retry) {
                LOG.debug("Committing offsets timed out. Maybe increase 'default.api.timeout.ms'?");
                return this.commit(false);
            }
            KafkaConsumerCommitException exception = new KafkaConsumerCommitException((Throwable)cause);
            this.closeWithError(exception);
            return Future.failedFuture((Throwable)exception);
        });
    }

    private void commitCurrentOffset(KafkaConsumerRecord<String, Buffer> record) {
        TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1L, "");
        this.kafkaConsumer.commit(Map.of(topicPartition, offsetAndMetadata));
    }

    private void closeWithError(Throwable exception) {
        LOG.error("Closing consumer with cause", exception);
        this.closeHandler.handle((Object)exception);
        this.stop();
    }
}

