/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner.task;

import io.debezium.connector.spanner.db.model.Partition;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.metrics.event.TaskStateChangeQueueUpdateMetricEvent;
import io.debezium.connector.spanner.task.TaskStateChangeEventHandler;
import io.debezium.connector.spanner.task.TaskStateUtil;
import io.debezium.connector.spanner.task.TaskSyncContextHolder;
import io.debezium.connector.spanner.task.state.NewPartitionsEvent;
import io.debezium.connector.spanner.task.state.SyncEvent;
import io.debezium.connector.spanner.task.state.TaskStateChangeEvent;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskStateChangeEventProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskStateChangeEventProcessor.class);
    private final BlockingQueue<TaskStateChangeEvent> queue;
    private final TaskSyncContextHolder taskSyncContextHolder;
    private final TaskStateChangeEventHandler taskStateChangeEventHandler;
    private final Consumer<Throwable> errorHandler;
    private final MetricsEventPublisher metricsEventPublisher;
    private volatile Thread thread;
    private volatile Thread eventQueueingThread;
    private final Duration sleepInterval = Duration.ofMillis(100L);

    public TaskStateChangeEventProcessor(int queueCapacity, TaskSyncContextHolder taskSyncContextHolder, TaskStateChangeEventHandler taskStateChangeEventHandler, Consumer<Throwable> errorHandler, MetricsEventPublisher metricsEventPublisher) {
        this.queue = new ArrayBlockingQueue<TaskStateChangeEvent>(queueCapacity);
        this.taskSyncContextHolder = taskSyncContextHolder;
        this.errorHandler = errorHandler;
        this.taskStateChangeEventHandler = taskStateChangeEventHandler;
        this.metricsEventPublisher = metricsEventPublisher;
    }

    private Thread createEventQueueingThread() {
        Thread thread = new Thread(() -> {
            LOGGER.info("Task {}, Started Event Queueing Thread", (Object)this.taskSyncContextHolder.get().getTaskUid());
            Clock clock = Clock.system();
            Metronome metronome = Metronome.sleeper((Duration)Duration.ofSeconds(5L), (Clock)clock);
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    if (this.taskSyncContextHolder.get() == null) continue;
                    LOGGER.debug("Task {}, continuing Event Queueing Thread", (Object)this.taskSyncContextHolder.get().getTaskUid());
                    this.queue.put(new SyncEvent());
                    metronome.pause();
                }
                catch (InterruptedException e) {
                    LOGGER.error("Task interrupting event queueing thread");
                    Thread.currentThread().interrupt();
                    return;
                }
                catch (Exception e) {
                    LOGGER.error("Task caught exception from event queueing thread", (Throwable)e);
                    throw e;
                }
            }
            LOGGER.info("Task {}, Terminating Event Queueing Thread", (Object)this.taskSyncContextHolder.get().getTaskUid());
        });
        thread.setUncaughtExceptionHandler((t, e) -> this.errorHandler.accept(e));
        return thread;
    }

    private Thread createEventHandlerThread() {
        Thread thread = new Thread(() -> {
            while (!Thread.interrupted()) {
                TaskStateChangeEvent event;
                try {
                    LOGGER.debug("createEventHandlerThread: Wait for sync event");
                    event = this.queue.take();
                    this.metricsEventPublisher.publishMetricEvent(new TaskStateChangeQueueUpdateMetricEvent(this.queue.remainingCapacity()));
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LOGGER.info("Task {}, interrupting the event handler thread", (Object)this.taskSyncContextHolder.get().getTaskUid());
                    return;
                }
                this.taskSyncContextHolder.awaitNewEpoch();
                try {
                    this.taskStateChangeEventHandler.processEvent(event);
                }
                catch (InterruptedException e) {
                    LOGGER.info("Task {}, interrupting the event handler thread", (Object)this.taskSyncContextHolder.get().getTaskUid());
                    Thread.currentThread().interrupt();
                }
            }
        }, "SpannerConnector-TaskStateChangeEventProcessor");
        thread.setUncaughtExceptionHandler((t, e) -> this.errorHandler.accept(e));
        return thread;
    }

    public void startProcessing() {
        if (this.thread != null) {
            return;
        }
        this.thread = this.createEventHandlerThread();
        this.thread.start();
        this.eventQueueingThread = this.createEventQueueingThread();
        this.eventQueueingThread.start();
    }

    public void stopProcessing() {
        if (this.eventQueueingThread != null) {
            Clock clock = Clock.system();
            Metronome metronome = Metronome.sleeper((Duration)this.sleepInterval, (Clock)clock);
            LOGGER.info("Task {}, stopping event queueing thread ", (Object)this.taskSyncContextHolder.get().getTaskUid());
            while (!this.eventQueueingThread.getState().equals((Object)Thread.State.TERMINATED)) {
                try {
                    LOGGER.info("Task {}, still waiting for event queueing thread to die", (Object)this.taskSyncContextHolder.get().getTaskUid());
                    this.eventQueueingThread.interrupt();
                    metronome.pause();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            this.eventQueueingThread = null;
            LOGGER.info("Task {}, killed event queueing thread ", (Object)this.taskSyncContextHolder.get().getTaskUid());
        }
        if (this.thread != null) {
            this.queue.clear();
            this.thread.interrupt();
            this.thread = null;
        }
    }

    public void processEvent(TaskStateChangeEvent event) throws InterruptedException {
        if (event instanceof NewPartitionsEvent) {
            NewPartitionsEvent newPartitionsEvent = (NewPartitionsEvent)event;
            List<Partition> filteredPartitions = this.removeAlreadyExistingPartitions(newPartitionsEvent.getPartitions());
            if (!filteredPartitions.isEmpty()) {
                this.queue.put(new NewPartitionsEvent(filteredPartitions));
            }
        } else {
            this.queue.put(event);
        }
        this.metricsEventPublisher.publishMetricEvent(new TaskStateChangeQueueUpdateMetricEvent(this.queue.remainingCapacity()));
    }

    private List<Partition> removeAlreadyExistingPartitions(List<Partition> partitions) {
        Set<String> existingPartitions = TaskStateUtil.allPartitionTokens(this.taskSyncContextHolder.get());
        return partitions.stream().filter(p -> !existingPartitions.contains(p.getToken())).collect(Collectors.toList());
    }
}

