/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner.task.operation;

import io.debezium.connector.spanner.db.model.Partition;
import io.debezium.connector.spanner.db.stream.ChangeStream;
import io.debezium.connector.spanner.kafka.internal.model.PartitionState;
import io.debezium.connector.spanner.kafka.internal.model.PartitionStateEnum;
import io.debezium.connector.spanner.kafka.internal.model.TaskState;
import io.debezium.connector.spanner.task.PartitionFactory;
import io.debezium.connector.spanner.task.TaskSyncContext;
import io.debezium.connector.spanner.task.operation.Operation;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TakePartitionForStreamingOperation
implements Operation {
    private static final Logger LOGGER = LoggerFactory.getLogger(TakePartitionForStreamingOperation.class);
    private final ChangeStream changeStream;
    private final PartitionFactory partitionFactory;
    private boolean isRequiredPublishSyncEvent = false;

    public TakePartitionForStreamingOperation(ChangeStream changeStream, PartitionFactory partitionFactory) {
        this.changeStream = changeStream;
        this.partitionFactory = partitionFactory;
    }

    private TaskSyncContext takePartitionForStreaming(TaskSyncContext taskSyncContext) {
        TaskState taskState = taskSyncContext.getCurrentTaskState();
        List<PartitionState> toStreaming = taskState.getPartitions().stream().filter(partitionState -> partitionState.getState().equals((Object)PartitionStateEnum.READY_FOR_STREAMING)).collect(Collectors.toList());
        HashSet toSchedule = new HashSet();
        toStreaming.forEach(partitionState -> {
            if (this.submitPartition((PartitionState)partitionState)) {
                toSchedule.add(partitionState.getToken());
            }
        });
        List<PartitionState> partitions = taskState.getPartitions().stream().map(partitionState -> {
            if (toSchedule.contains(partitionState.getToken())) {
                return partitionState.toBuilder().state(PartitionStateEnum.SCHEDULED).build();
            }
            return partitionState;
        }).collect(Collectors.toList());
        boolean bl = this.isRequiredPublishSyncEvent = !toSchedule.isEmpty();
        if (this.isRequiredPublishSyncEvent) {
            LOGGER.debug("Task scheduled {} partitions, taskUid: {}", (Object)toSchedule.size(), (Object)taskSyncContext.getTaskUid());
        }
        return taskSyncContext.toBuilder().currentTaskState(taskState.toBuilder().partitions(partitions).build()).build();
    }

    private boolean submitPartition(PartitionState partitionState) {
        Partition partition = this.partitionFactory.getPartition(partitionState);
        return this.changeStream.submitPartition(partition);
    }

    private TaskSyncContext removeAlreadyStreamingPartitions(TaskSyncContext taskSyncContext) {
        TaskState taskState = taskSyncContext.getCurrentTaskState();
        List<PartitionState> partitions = taskSyncContext.getCurrentTaskState().getPartitions().stream().map(partitionState -> {
            if (this.isPartitionStreamingAlready(taskSyncContext.getTaskStates().values(), partitionState.getToken()) && partitionState.getState().equals((Object)PartitionStateEnum.READY_FOR_STREAMING)) {
                LOGGER.info("Removing streaming partition {} with state {} since partition is already streaming", (Object)partitionState.getToken(), (Object)partitionState.getState());
                return null;
            }
            return partitionState;
        }).filter(Objects::nonNull).collect(Collectors.toList());
        return taskSyncContext.toBuilder().currentTaskState(taskState.toBuilder().partitions(partitions).build()).build();
    }

    private boolean isPartitionStreamingAlready(Collection<TaskState> taskStates, String token) {
        return taskStates.stream().flatMap(taskState -> taskState.getPartitions().stream()).filter(partitionState -> partitionState.getToken().equals(token)).anyMatch(partitionState -> partitionState.getState().equals((Object)PartitionStateEnum.SCHEDULED) || partitionState.getState().equals((Object)PartitionStateEnum.RUNNING) || partitionState.getState().equals((Object)PartitionStateEnum.FINISHED) || partitionState.getState().equals((Object)PartitionStateEnum.REMOVED));
    }

    private boolean isPartition(Collection<TaskState> taskStates, String token) {
        return taskStates.stream().flatMap(taskState -> taskState.getPartitions().stream()).filter(partitionState -> partitionState.getToken().equals(token)).anyMatch(partitionState -> partitionState.getState().equals((Object)PartitionStateEnum.SCHEDULED) || partitionState.getState().equals((Object)PartitionStateEnum.RUNNING) || partitionState.getState().equals((Object)PartitionStateEnum.FINISHED) || partitionState.getState().equals((Object)PartitionStateEnum.REMOVED));
    }

    @Override
    public boolean isRequiredPublishSyncEvent() {
        return this.isRequiredPublishSyncEvent;
    }

    @Override
    public TaskSyncContext doOperation(TaskSyncContext taskSyncContext) {
        taskSyncContext = this.removeAlreadyStreamingPartitions(taskSyncContext);
        return this.takePartitionForStreaming(taskSyncContext);
    }
}

