/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner.task.leader;

import io.debezium.DebeziumException;
import io.debezium.connector.spanner.kafka.internal.KafkaConsumerAdminService;
import io.debezium.connector.spanner.kafka.internal.TaskSyncPublisher;
import io.debezium.connector.spanner.kafka.internal.model.MessageTypeEnum;
import io.debezium.connector.spanner.kafka.internal.model.RebalanceState;
import io.debezium.connector.spanner.kafka.internal.model.TaskState;
import io.debezium.connector.spanner.kafka.internal.model.TaskSyncEvent;
import io.debezium.connector.spanner.task.LoggerUtils;
import io.debezium.connector.spanner.task.TaskStateUtil;
import io.debezium.connector.spanner.task.TaskSyncContext;
import io.debezium.connector.spanner.task.TaskSyncContextHolder;
import io.debezium.connector.spanner.task.leader.LeaderService;
import io.debezium.connector.spanner.task.leader.rebalancer.TaskPartitionRebalancer;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LeaderAction {
    private static final Logger LOGGER = LoggerFactory.getLogger(LeaderAction.class);
    private static final Duration EPOCH_OFFSET_UPDATE_DURATION = Duration.ofSeconds(60L);
    private final TaskSyncContextHolder taskSyncContextHolder;
    private final KafkaConsumerAdminService kafkaAdminService;
    private final LeaderService leaderService;
    private final TaskPartitionRebalancer taskPartitonRebalancer;
    private final TaskSyncPublisher taskSyncPublisher;
    private volatile Thread leaderThread;
    private Consumer<Throwable> errorHandler;

    public LeaderAction(TaskSyncContextHolder taskSyncContextHolder, KafkaConsumerAdminService kafkaAdminService, LeaderService leaderService, TaskPartitionRebalancer taskPartitonRebalancer, TaskSyncPublisher taskSyncPublisher, Consumer<Throwable> errorHandler) {
        this.taskSyncContextHolder = taskSyncContextHolder;
        this.kafkaAdminService = kafkaAdminService;
        this.leaderService = leaderService;
        this.taskPartitonRebalancer = taskPartitonRebalancer;
        this.taskSyncPublisher = taskSyncPublisher;
        this.errorHandler = errorHandler;
    }

    private Thread createLeaderThread() {
        Thread thread = new Thread(() -> {
            LOGGER.info("performLeaderAction: Task {} start leader thread", (Object)this.taskSyncContextHolder.get().getTaskUid());
            try {
                this.newEpoch();
            }
            catch (InterruptedException e) {
                LOGGER.info("performLeaderAction: Task {} stop leader thread", (Object)this.taskSyncContextHolder.get().getTaskUid());
                Thread.currentThread().interrupt();
                return;
            }
            while (!Thread.interrupted()) {
                try {
                    Thread.sleep(EPOCH_OFFSET_UPDATE_DURATION.toMillis());
                    if (this.taskSyncContextHolder.get().getRebalanceState() != RebalanceState.NEW_EPOCH_STARTED) continue;
                    this.publishEpochOffset();
                }
                catch (InterruptedException e) {
                    LOGGER.info("performLeaderAction: Task {} stop leader thread", (Object)this.taskSyncContextHolder.get().getTaskUid());
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }, "SpannerConnector-LeaderAction");
        thread.setUncaughtExceptionHandler((t, ex) -> {
            LOGGER.error("Leader action execution error", ex);
            this.errorHandler.accept(ex);
        });
        return thread;
    }

    private TaskSyncContext publishEpochOffset() throws InterruptedException {
        TaskSyncContext taskSyncContext = this.taskSyncContextHolder.updateAndGet(oldContext -> oldContext.toBuilder().epochOffsetHolder(oldContext.getEpochOffsetHolder().nextOffset(oldContext.getCurrentKafkaRecordOffset())).build());
        this.taskSyncPublisher.send(taskSyncContext.buildTaskSyncEvent(MessageTypeEnum.UPDATE_EPOCH));
        LOGGER.info("Task {} - Epoch offset has been incremented and published {}:{}", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), taskSyncContext.getRebalanceGenerationId(), taskSyncContext.getEpochOffsetHolder().getEpochOffset()});
        return taskSyncContext;
    }

    public void start() {
        if (this.leaderThread != null) {
            this.stop();
        }
        this.leaderThread = this.createLeaderThread();
        this.leaderThread.start();
    }

    public void stop() {
        if (this.leaderThread == null) {
            return;
        }
        this.leaderThread.interrupt();
        while (!this.leaderThread.getState().equals((Object)Thread.State.TERMINATED)) {
        }
        this.leaderThread = null;
    }

    private void newEpoch() throws InterruptedException {
        LOGGER.info("performLeaderActions: new epoch initialization");
        boolean startFromScratch = this.leaderService.isStartFromScratch();
        Set<String> activeConsumers = this.kafkaAdminService.getActiveConsumerGroupMembers();
        LOGGER.info("performLeaderActions: consumers found {}", activeConsumers);
        Map<String, String> consumerToTaskMap = this.leaderService.awaitAllNewTaskStateUpdates(activeConsumers, this.taskSyncContextHolder.get().getRebalanceGenerationId());
        LOGGER.info("performLeaderActions: answers received {}", consumerToTaskMap);
        if (consumerToTaskMap.size() < activeConsumers.size()) {
            LOGGER.info("TaskUid {}, Expected active consumers {}, but only received consumers {}, not sending new epoch", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), activeConsumers, consumerToTaskMap});
            throw new DebeziumException("Task Uid " + this.taskSyncContextHolder.get().getTaskUid() + " Expected active consumers " + activeConsumers.toString() + " but only received consumers " + consumerToTaskMap.toString() + " not sending new epoch ");
        }
        TaskSyncContext staleContext = this.taskSyncContextHolder.get();
        boolean foundDuplication = false;
        if (staleContext.checkDuplication(false, "NEW EPOCH rebalance event, initial context")) {
            foundDuplication = true;
        }
        TaskSyncContext taskSyncContext = this.taskSyncContextHolder.updateAndGet(oldContext -> {
            TaskState leaderState = oldContext.getCurrentTaskState();
            Map<String, TaskState> currentTaskStates = oldContext.getAllTaskStates();
            Map<Boolean, Map<String, TaskState>> isSurvivedPartitionedTaskStates = TaskStateUtil.splitSurvivedAndObsoleteTaskStates(currentTaskStates, consumerToTaskMap.values());
            Map<String, TaskState> survivedTasks = isSurvivedPartitionedTaskStates.get(true);
            Map<String, TaskState> obsoleteTasks = isSurvivedPartitionedTaskStates.get(false);
            leaderState = this.taskPartitonRebalancer.rebalance(leaderState, survivedTasks, obsoleteTasks);
            return oldContext.toBuilder().currentTaskState(leaderState).rebalanceState(RebalanceState.NEW_EPOCH_STARTED).taskStates(TaskStateUtil.filterSurvivedTasksStates(oldContext.getTaskStates(), survivedTasks.keySet())).epochOffsetHolder(oldContext.getEpochOffsetHolder().nextOffset(oldContext.getCurrentKafkaRecordOffset())).build();
        });
        if (!foundDuplication) {
            taskSyncContext.checkDuplication(true, "NEW EPOCH rebalance event, resulting context");
        }
        TaskSyncEvent taskSyncEvent = taskSyncContext.buildTaskSyncEvent(MessageTypeEnum.NEW_EPOCH);
        LOGGER.debug("Task {} - sent new epoch {}", (Object)taskSyncContext.getTaskUid(), (Object)taskSyncEvent);
        LOGGER.info("Task {} - LeaderAction sent sync event with rebalance generation ID {}: and epoch offset {}", new Object[]{taskSyncContext.getTaskUid(), taskSyncContext.getRebalanceGenerationId(), taskSyncContext.getEpochOffsetHolder().getEpochOffset()});
        this.taskSyncPublisher.send(taskSyncEvent);
        if (startFromScratch) {
            this.leaderService.newParentPartition();
            LOGGER.info("performLeaderActions: newParentPartition");
        }
        LoggerUtils.debug(LOGGER, "performLeaderActions: new epoch {}", taskSyncEvent);
    }
}

