/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner.kafka.internal;

import com.google.protobuf.InvalidProtocolBufferException;
import io.debezium.DebeziumException;
import io.debezium.connector.spanner.exception.SpannerConnectorException;
import io.debezium.connector.spanner.function.BlockingBiConsumer;
import io.debezium.connector.spanner.kafka.event.proto.SyncEventProtos;
import io.debezium.connector.spanner.kafka.internal.SyncEventConsumerFactory;
import io.debezium.connector.spanner.kafka.internal.model.SyncEventMetadata;
import io.debezium.connector.spanner.kafka.internal.model.TaskSyncEvent;
import io.debezium.connector.spanner.kafka.internal.proto.SyncEventFromProtoMapper;
import io.debezium.connector.spanner.task.LoggerUtils;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Stopwatch;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskSyncEventListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskSyncEventListener.class);
    private final String consumerGroup;
    private final String topic;
    private final boolean seekBackToPreviousEpoch;
    private final Duration pollDuration;
    private final Duration commitOffsetsTimeout;
    private final long commitOffsetsInterval;
    private final SyncEventConsumerFactory<String, byte[]> consumerFactory;
    private final List<BlockingBiConsumer<TaskSyncEvent, SyncEventMetadata>> eventConsumers = new ArrayList<BlockingBiConsumer<TaskSyncEvent, SyncEventMetadata>>();
    private final java.util.function.Consumer<RuntimeException> errorHandler;
    private final Duration pollInterval = Duration.ofMillis(300000L);
    private final Clock clock;
    private volatile Thread thread;

    public TaskSyncEventListener(String consumerGroup, String topic, SyncEventConsumerFactory<String, byte[]> consumerFactory, boolean seekBackToPreviousEpoch, java.util.function.Consumer<RuntimeException> errorHandler) {
        this.consumerGroup = consumerGroup;
        this.topic = topic;
        this.seekBackToPreviousEpoch = seekBackToPreviousEpoch;
        this.pollDuration = Duration.ofMillis(consumerFactory.getConfig().syncPollDuration());
        this.commitOffsetsTimeout = Duration.ofMillis(consumerFactory.getConfig().syncCommitOffsetsTimeout());
        this.commitOffsetsInterval = consumerFactory.getConfig().syncCommitOffsetsInterval();
        this.consumerFactory = consumerFactory;
        this.errorHandler = errorHandler;
        this.clock = Clock.system();
    }

    public void subscribe(BlockingBiConsumer<TaskSyncEvent, SyncEventMetadata> eventConsumer) {
        this.eventConsumers.add(eventConsumer);
    }

    public void unsubscribe(BiConsumer<TaskSyncEvent, SyncEventMetadata> eventConsumer) {
        this.eventConsumers.remove(eventConsumer);
    }

    public void start() throws InterruptedException {
        Long endOffset;
        Consumer<String, byte[]> consumer;
        block7: {
            TopicPartition topicPartition = new TopicPartition(this.topic, 0);
            List<TopicPartition> assignment = List.of(topicPartition);
            consumer = this.consumerFactory.createConsumer(this.consumerGroup);
            consumer.assign(assignment);
            endOffset = (Long)consumer.endOffsets(assignment).get(topicPartition);
            Long beginOffset = (Long)consumer.beginningOffsets(assignment).get(topicPartition);
            long startOffset = Math.max(endOffset - 1L, beginOffset);
            try {
                if (endOffset == startOffset) {
                    LOGGER.info("task {}, listen: Sync topic is empty, so initial sync is finished", (Object)this.consumerGroup);
                    for (BlockingBiConsumer<TaskSyncEvent, SyncEventMetadata> eventConsumer : this.eventConsumers) {
                        eventConsumer.accept(null, SyncEventMetadata.builder().canInitiateRebalancing(true).build());
                    }
                    break block7;
                }
                LOGGER.info("Task {}, listen: read last message with start offset {} and end offset {}", new Object[]{this.consumerGroup, startOffset, endOffset});
                try {
                    LOGGER.info("Task {}, seeking back to end offset {}", (Object)this.consumerGroup, (Object)endOffset);
                    consumer.seek(topicPartition, startOffset);
                    LOGGER.info("Task {}, seeking back to previous epoch", (Object)this.consumerGroup);
                    this.seekBackToPreviousEpoch(consumer, topicPartition, beginOffset, startOffset);
                }
                catch (InterruptException e) {
                    LOGGER.info("Task {}, caught interrupt exception during reading the sync topic", (Object)this.consumerGroup, (Object)e);
                    throw new InterruptedException();
                }
                catch (Exception e) {
                    LOGGER.info("Task {}, Error during seek back the Sync Topic {}", (Object)this.consumerGroup);
                    this.errorHandler.accept(new SpannerConnectorException("Error during seek back the Sync Topic", e));
                    return;
                }
            }
            catch (Exception ex2) {
                LOGGER.info("Shutdown consumer {} for ex {}", (Object)this.consumerGroup, (Object)ex2);
                this.shutdownConsumer(consumer);
                this.errorHandler.accept(new RuntimeException(ex2));
            }
        }
        this.thread = new Thread(() -> {
            try {
                long commitOffsetStart = System.currentTimeMillis();
                LOGGER.info("Task {}, beginning to poll the sync topic", (Object)this.consumerGroup);
                Stopwatch sw = Stopwatch.accumulating().start();
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        Duration totalDuration = sw.stop().durations().statistics().getTotal();
                        if (totalDuration.toMillis() >= this.pollInterval.toMillis()) {
                            LOGGER.info("Task {}, still polling the sync topic", (Object)this.consumerGroup);
                            sw = Stopwatch.accumulating().start();
                        } else {
                            sw.start();
                        }
                        this.poll(consumer, endOffset);
                        if (this.consumerFactory.isAutoCommitEnabled() || commitOffsetStart + this.commitOffsetsInterval >= System.currentTimeMillis()) continue;
                        consumer.commitSync(this.commitOffsetsTimeout);
                        commitOffsetStart = System.currentTimeMillis();
                    }
                    catch (InterruptedException | InterruptException ex) {
                        LOGGER.error("TaskSyncEventListener, caught interrupt exception {}", (Object)this.consumerGroup, (Object)ex);
                        Thread.currentThread().interrupt();
                        this.shutdownConsumer(consumer);
                        return;
                    }
                    catch (Exception e) {
                        try {
                            this.errorHandler.accept(new SpannerConnectorException("Error during poll from the Sync Topic", e));
                            this.shutdownConsumer(consumer);
                            return;
                        }
                        catch (Throwable throwable) {
                            throw throwable;
                            return;
                        }
                    }
                }
            }
            finally {
                this.shutdownConsumer(consumer);
            }
        }, "SpannerConnector-TaskSyncEventListener");
        this.thread.setUncaughtExceptionHandler((t, ex) -> {
            LOGGER.error("Error in SpannerConnector-TaskSyncEventListener, task {}, ex {}", (Object)this.consumerGroup, (Object)ex.getStackTrace());
            this.errorHandler.accept(new RuntimeException(ex));
        });
        this.thread.start();
    }

    private int poll(Consumer<String, byte[]> consumer, long endOffset) throws InvalidProtocolBufferException, InterruptedException {
        ConsumerRecords records = consumer.poll(this.pollDuration);
        LOGGER.trace("listen: poll messages count: {}", (Object)records.count());
        if (records.isEmpty()) {
            return 0;
        }
        for (ConsumerRecord record : records) {
            TaskSyncEvent taskSyncEvent = this.parseSyncEvent((ConsumerRecord<String, byte[]>)record);
            LoggerUtils.debug(LOGGER, "Receive SyncEvent from Kafka topic: {}", taskSyncEvent);
            if (record.offset() == endOffset - 1L) {
                LOGGER.info("Task {}, can begin to initiate rebalancing", (Object)this.consumerGroup);
            }
            for (BlockingBiConsumer<TaskSyncEvent, SyncEventMetadata> eventConsumer : this.eventConsumers) {
                boolean canInitiateRebalancing = record.offset() >= endOffset - 1L;
                eventConsumer.accept(taskSyncEvent, SyncEventMetadata.builder().offset(record.offset()).canInitiateRebalancing(canInitiateRebalancing).build());
            }
        }
        return records.count();
    }

    private void seekBackToPreviousEpoch(Consumer<String, byte[]> consumer, TopicPartition topicPartition, long beginOffset, long currOffset) throws InvalidProtocolBufferException {
        if (!this.seekBackToPreviousEpoch) {
            LOGGER.info("Task {}, not seeking back to previous epoch");
            return;
        }
        ConsumerRecords records = consumer.poll(this.pollDuration);
        long currentOffset = currOffset;
        while (records.isEmpty()) {
            if (currOffset - --currentOffset >= 100L) {
                throw new DebeziumException("Task " + this.consumerGroup + "failed to poll last message from the sync topic");
            }
            LOGGER.warn("Task {}, listen: fail to poll last message, trying again", (Object)this.consumerGroup);
            consumer.seek(topicPartition, currentOffset);
            records = consumer.poll(this.pollDuration);
        }
        ConsumerRecord lastRecord = (ConsumerRecord)records.iterator().next();
        TaskSyncEvent taskSyncEvent = this.parseSyncEvent((ConsumerRecord<String, byte[]>)lastRecord);
        long previousEpochOffset = taskSyncEvent.getEpochOffset();
        long startOffset = Math.max(previousEpochOffset, beginOffset);
        LOGGER.info("Task {}, listen: found previous epoch offset {} and begin offset {}", new Object[]{this.consumerGroup, previousEpochOffset, beginOffset});
        LOGGER.info("Task {}, listen: seek back to previous epoch offset: {}", (Object)this.consumerGroup, (Object)startOffset);
        consumer.seek(topicPartition, startOffset);
    }

    private TaskSyncEvent parseSyncEvent(ConsumerRecord<String, byte[]> record) throws InvalidProtocolBufferException {
        return SyncEventFromProtoMapper.mapFromProto(SyncEventProtos.SyncEvent.parseFrom((byte[])record.value()));
    }

    private void shutdownConsumer(Consumer<String, byte[]> consumer) {
        block2: {
            try {
                LOGGER.info("TaskSyncEventListener, Shutting down consumer {}", (Object)this.consumerGroup);
                consumer.unsubscribe();
                consumer.close();
            }
            catch (InterruptException e) {
                if (Thread.currentThread().isInterrupted()) break block2;
                Thread.currentThread().interrupt();
            }
        }
    }

    public void shutdown() {
        LOGGER.info("Stopping TaskSyncEventListener for Task Uid {}", (Object)this.consumerGroup);
        if (this.thread == null) {
            return;
        }
        this.thread.interrupt();
        Metronome metronome = Metronome.sleeper((Duration)Duration.ofMillis(100L), (Clock)this.clock);
        while (!this.thread.getState().equals((Object)Thread.State.TERMINATED)) {
            try {
                LOGGER.info("Still stopping TaskSyncEventListener for Task Uid {} with state {}", (Object)this.consumerGroup, (Object)this.thread.getState());
                this.thread.interrupt();
                metronome.pause();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        LOGGER.info("Stopped TaskSyncEventListener for Task Uid {}", (Object)this.consumerGroup);
        this.thread = null;
    }
}

