/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.QuadFunction;
import org.apache.flink.util.function.TriFunction;

public class JobMasterTester
implements Closeable {
    private static final Time TIMEOUT = Time.minutes((long)1L);
    private final UnresolvedTaskManagerLocation taskManagerLocation = new LocalUnresolvedTaskManagerLocation();
    private final ConcurrentMap<ExecutionAttemptID, TaskDeploymentDescriptor> descriptors = new ConcurrentHashMap<ExecutionAttemptID, TaskDeploymentDescriptor>();
    private final TestingRpcService rpcService;
    private final JobID jobId;
    private final JobMasterGateway jobMasterGateway;
    private final TaskExecutorGateway taskExecutorGateway;
    private final CompletableFuture<List<TaskDeploymentDescriptor>> descriptorsFuture = new CompletableFuture();
    private final ConcurrentMap<Long, CheckpointCompletionHandler> checkpoints = new ConcurrentHashMap<Long, CheckpointCompletionHandler>();

    private static TaskStateSnapshot createNonEmptyStateSnapshot(TaskInformation taskInformation) {
        TaskStateSnapshot checkpointStateHandles = new TaskStateSnapshot();
        checkpointStateHandles.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)taskInformation.getJobVertexId()), OperatorSubtaskState.builder().setManagedOperatorState((OperatorStateHandle)new OperatorStreamStateHandle(Collections.emptyMap(), (StreamStateHandle)new ByteStreamStateHandle("foobar", new byte[0]))).build());
        return checkpointStateHandles;
    }

    public JobMasterTester(TestingRpcService rpcService, JobID jobId, JobMasterGateway jobMasterGateway) {
        this.rpcService = rpcService;
        this.jobId = jobId;
        this.jobMasterGateway = jobMasterGateway;
        this.taskExecutorGateway = this.createTaskExecutorGateway();
    }

    public CompletableFuture<Acknowledge> transitionTo(List<TaskDeploymentDescriptor> descriptors, ExecutionState state) {
        List futures = descriptors.stream().map(TaskDeploymentDescriptor::getExecutionAttemptId).map(attemptId -> this.jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(attemptId, state))).collect(Collectors.toList());
        return FutureUtils.completeAll(futures).thenApply(ignored -> Acknowledge.get());
    }

    public CompletableFuture<List<TaskDeploymentDescriptor>> deployVertices(int numSlots) {
        return ((CompletableFuture)this.jobMasterGateway.registerTaskManager(this.jobId, TaskManagerRegistrationInformation.create((String)this.taskExecutorGateway.getAddress(), (UnresolvedTaskManagerLocation)this.taskManagerLocation, (UUID)TestingUtils.zeroUUID()), TIMEOUT).thenCompose(ignored -> this.offerSlots(numSlots))).thenCompose(ignored -> this.descriptorsFuture);
    }

    public CompletableFuture<Void> getCheckpointFuture(long checkpointId) {
        return this.descriptorsFuture.thenCompose(descriptors -> this.checkpoints.computeIfAbsent(checkpointId, key -> new CheckpointCompletionHandler((List<TaskDeploymentDescriptor>)descriptors)).getCompletedFuture());
    }

    @Override
    public void close() throws IOException {
        this.rpcService.unregisterGateway(this.taskExecutorGateway.getAddress());
    }

    private TaskExecutorGateway createTaskExecutorGateway() {
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer(this::onSubmitTaskConsumer).setTriggerCheckpointFunction((QuadFunction<ExecutionAttemptID, Long, Long, CheckpointOptions, CompletableFuture<Acknowledge>>)((QuadFunction)this::onTriggerCheckpoint)).setConfirmCheckpointFunction((TriFunction<ExecutionAttemptID, Long, Long, CompletableFuture<Acknowledge>>)((TriFunction)this::onConfirmCheckpoint)).createTestingTaskExecutorGateway();
        this.rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        return taskExecutorGateway;
    }

    private CompletableFuture<TaskInformation> getTaskInformation(ExecutionAttemptID executionAttemptId) {
        return this.descriptorsFuture.thenApply(descriptors -> {
            TaskDeploymentDescriptor descriptor = descriptors.stream().filter(desc -> executionAttemptId.equals((Object)desc.getExecutionAttemptId())).findAny().orElseThrow(() -> new IllegalStateException(String.format("Task descriptor for %s not found.", executionAttemptId)));
            try {
                return descriptor.getTaskInformation();
            }
            catch (Exception e) {
                throw new IllegalStateException(String.format("Unable to deserialize task information of %s.", executionAttemptId));
            }
        });
    }

    private CompletableFuture<Acknowledge> onTriggerCheckpoint(ExecutionAttemptID executionAttemptId, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) {
        return this.getTaskInformation(executionAttemptId).thenCompose(taskInformation -> {
            this.jobMasterGateway.acknowledgeCheckpoint(this.jobId, executionAttemptId, checkpointId, new CheckpointMetrics(), TaskStateSnapshot.serializeTaskStateSnapshot((TaskStateSnapshot)JobMasterTester.createNonEmptyStateSnapshot(taskInformation)));
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
    }

    private CompletableFuture<Acknowledge> onConfirmCheckpoint(ExecutionAttemptID executionAttemptId, long checkpointId, long checkpointTimestamp) {
        return this.getTaskInformation(executionAttemptId).thenCompose(taskInformation -> this.completeAttemptCheckpoint(checkpointId, executionAttemptId));
    }

    private CompletableFuture<Acknowledge> onSubmitTaskConsumer(TaskDeploymentDescriptor taskDeploymentDescriptor, JobMasterId jobMasterId) {
        return this.jobMasterGateway.requestJob(TIMEOUT).thenCompose(executionGraphInfo -> {
            int numVertices = Iterables.size((Iterable)executionGraphInfo.getArchivedExecutionGraph().getAllExecutionVertices());
            this.descriptors.put(taskDeploymentDescriptor.getExecutionAttemptId(), taskDeploymentDescriptor);
            if (this.descriptors.size() == numVertices) {
                this.descriptorsFuture.complete(new ArrayList(this.descriptors.values()));
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
    }

    private CompletableFuture<Acknowledge> completeAttemptCheckpoint(long checkpointId, ExecutionAttemptID executionAttemptId) {
        return ((CompletableFuture)this.descriptorsFuture.thenAccept(descriptors -> this.checkpoints.computeIfAbsent(checkpointId, key -> new CheckpointCompletionHandler((List<TaskDeploymentDescriptor>)descriptors)).completeAttempt(executionAttemptId))).thenApply(ignored -> Acknowledge.get());
    }

    private CompletableFuture<Collection<SlotOffer>> offerSlots(int numSlots) {
        ArrayList<SlotOffer> offers = new ArrayList<SlotOffer>();
        for (int idx = 0; idx < numSlots; ++idx) {
            offers.add(new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY));
        }
        return this.jobMasterGateway.offerSlots(this.taskManagerLocation.getResourceID(), offers, TIMEOUT);
    }

    private static class CheckpointCompletionHandler {
        private final Map<ExecutionAttemptID, CompletableFuture<Void>> completedAttemptFutures;
        private final CompletableFuture<Void> completedFuture;

        public CheckpointCompletionHandler(List<TaskDeploymentDescriptor> descriptors) {
            this.completedAttemptFutures = descriptors.stream().map(TaskDeploymentDescriptor::getExecutionAttemptId).collect(Collectors.toMap(Function.identity(), ignored -> new CompletableFuture()));
            this.completedFuture = FutureUtils.completeAll(this.completedAttemptFutures.values());
        }

        void completeAttempt(ExecutionAttemptID executionAttemptId) {
            this.completedAttemptFutures.get(executionAttemptId).complete(null);
        }

        CompletableFuture<Void> getCompletedFuture() {
            return this.completedFuture;
        }
    }
}

