package org.apache.flink.runtime.state;

import java.io.File;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateHandleDummyUtil;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/state/TaskStateManagerImplTest.class */
class TaskStateManagerImplTest {
    TaskStateManagerImplTest() {
    }

    @Test
    void testStateReportingAndRetrieving() {
        JobID jobID = new JobID();
        ExecutionAttemptID createExecutionAttemptId = ExecutionGraphTestUtils.createExecutionAttemptId();
        TestCheckpointResponder testCheckpointResponder = new TestCheckpointResponder();
        TestTaskLocalStateStore testTaskLocalStateStore = new TestTaskLocalStateStore();
        InMemoryStateChangelogStorage inMemoryStateChangelogStorage = new InMemoryStateChangelogStorage();
        TaskStateManager taskStateManager = taskStateManager(jobID, createExecutionAttemptId, testCheckpointResponder, null, testTaskLocalStateStore, inMemoryStateChangelogStorage);
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(74L, 11L);
        CheckpointMetrics checkpointMetrics = new CheckpointMetrics();
        TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
        OperatorID operatorID = new OperatorID(1L, 1L);
        OperatorID operatorID2 = new OperatorID(2L, 2L);
        OperatorID operatorID3 = new OperatorID(3L, 3L);
        Assertions.assertThat(taskStateManager.prioritizedOperatorState(operatorID).isRestored()).isFalse();
        Assertions.assertThat(taskStateManager.prioritizedOperatorState(operatorID2).isRestored()).isFalse();
        Assertions.assertThat(taskStateManager.prioritizedOperatorState(operatorID3).isRestored()).isFalse();
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
        OperatorSubtaskState build = OperatorSubtaskState.builder().setManagedKeyedState(StateHandleDummyUtil.createNewKeyedStateHandle(keyGroupRange)).build();
        OperatorSubtaskState build2 = OperatorSubtaskState.builder().setRawKeyedState(StateHandleDummyUtil.createNewKeyedStateHandle(keyGroupRange)).build();
        taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, build);
        taskStateSnapshot.putSubtaskStateByOperatorID(operatorID2, build2);
        TaskStateSnapshot taskStateSnapshot2 = new TaskStateSnapshot();
        OperatorSubtaskState build3 = OperatorSubtaskState.builder().setManagedKeyedState(StateHandleDummyUtil.createNewKeyedStateHandle(keyGroupRange)).build();
        taskStateSnapshot2.putSubtaskStateByOperatorID(operatorID, build3);
        taskStateManager.reportTaskStateSnapshots(checkpointMetaData, checkpointMetrics, taskStateSnapshot, taskStateSnapshot2);
        TestCheckpointResponder.AcknowledgeReport acknowledgeReport = testCheckpointResponder.getAcknowledgeReports().get(0);
        Assertions.assertThat(acknowledgeReport.getCheckpointId()).isEqualTo(checkpointMetaData.getCheckpointId());
        Assertions.assertThat(acknowledgeReport.getCheckpointMetrics()).isEqualTo(checkpointMetrics);
        Assertions.assertThat(acknowledgeReport.getExecutionAttemptID()).isEqualTo(createExecutionAttemptId);
        Assertions.assertThat(acknowledgeReport.getJobID()).isEqualTo(jobID);
        Assertions.assertThat(acknowledgeReport.getSubtaskState()).isEqualTo(taskStateSnapshot);
        Assertions.assertThat(testTaskLocalStateStore.retrieveLocalState(checkpointMetaData.getCheckpointId())).isEqualTo(taskStateSnapshot2);
        TaskStateManager taskStateManager2 = taskStateManager(jobID, createExecutionAttemptId, testCheckpointResponder, new JobManagerTaskRestore(checkpointMetaData.getCheckpointId(), acknowledgeReport.getSubtaskState()), testTaskLocalStateStore, inMemoryStateChangelogStorage);
        PrioritizedOperatorSubtaskState prioritizedOperatorState = taskStateManager2.prioritizedOperatorState(operatorID);
        PrioritizedOperatorSubtaskState prioritizedOperatorState2 = taskStateManager2.prioritizedOperatorState(operatorID2);
        PrioritizedOperatorSubtaskState prioritizedOperatorState3 = taskStateManager2.prioritizedOperatorState(operatorID3);
        Assertions.assertThat(prioritizedOperatorState.isRestored()).isTrue();
        Assertions.assertThat(prioritizedOperatorState2.isRestored()).isTrue();
        Assertions.assertThat(prioritizedOperatorState3.isRestored()).isTrue();
        Assertions.assertThat(taskStateManager2.prioritizedOperatorState(new OperatorID()).isRestored()).isTrue();
        Iterator it = prioritizedOperatorState.getPrioritizedManagedKeyedState().iterator();
        Assertions.assertThat(it).hasNext();
        StateObjectCollection stateObjectCollection = (StateObjectCollection) it.next();
        KeyedStateHandle keyedStateHandle = (KeyedStateHandle) build3.getManagedKeyedState().iterator().next();
        Assertions.assertThat(keyedStateHandle).isSameAs((KeyedStateHandle) stateObjectCollection.iterator().next());
        Assertions.assertThat(it).hasNext();
        StateObjectCollection stateObjectCollection2 = (StateObjectCollection) it.next();
        KeyedStateHandle keyedStateHandle2 = (KeyedStateHandle) build.getManagedKeyedState().iterator().next();
        Assertions.assertThat(keyedStateHandle2).isSameAs((KeyedStateHandle) stateObjectCollection2.iterator().next());
        Assertions.assertThat(it).isExhausted();
        Iterator it2 = prioritizedOperatorState2.getPrioritizedRawKeyedState().iterator();
        Assertions.assertThat(it2).hasNext();
        StateObjectCollection stateObjectCollection3 = (StateObjectCollection) it2.next();
        KeyedStateHandle keyedStateHandle3 = (KeyedStateHandle) build2.getRawKeyedState().iterator().next();
        Assertions.assertThat(keyedStateHandle3).isSameAs((KeyedStateHandle) stateObjectCollection3.iterator().next());
        Assertions.assertThat(it2).isExhausted();
    }

    @Test
    void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore(@TempDir Path path) throws Exception {
        JobID jobID = new JobID(42L, 43L);
        AllocationID allocationID = new AllocationID(4711L, 23L);
        JobVertexID jobVertexID = new JobVertexID(12L, 34L);
        ExecutionAttemptID createExecutionAttemptId = ExecutionGraphTestUtils.createExecutionAttemptId(jobVertexID);
        TestCheckpointResponder testCheckpointResponder = new TestCheckpointResponder();
        Executor directExecutor = Executors.directExecutor();
        File[] fileArr = {TempDirUtils.newFolder(path), TempDirUtils.newFolder(path), TempDirUtils.newFolder(path)};
        TaskLocalStateStoreImpl taskLocalStateStoreImpl = new TaskLocalStateStoreImpl(jobID, allocationID, jobVertexID, 13, LocalRecoveryConfig.backupAndRecoveryEnabled(new LocalSnapshotDirectoryProviderImpl(fileArr, jobID, jobVertexID, 0)), directExecutor);
        TaskStateManager taskStateManager = taskStateManager(jobID, createExecutionAttemptId, testCheckpointResponder, null, taskLocalStateStoreImpl, new InMemoryStateChangelogStorage());
        LocalRecoveryConfig localRecoveryConfig = taskLocalStateStoreImpl.getLocalRecoveryConfig();
        LocalRecoveryConfig createLocalRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
        for (int i = 0; i < 10; i++) {
            Assertions.assertThat(((LocalSnapshotDirectoryProvider) localRecoveryConfig.getLocalStateDirectoryProvider().get()).allocationBaseDirectory(i)).isEqualTo(fileArr[i % fileArr.length]);
            Assertions.assertThat(((LocalSnapshotDirectoryProvider) createLocalRecoveryConfig.getLocalStateDirectoryProvider().get()).allocationBaseDirectory(i)).isEqualTo(fileArr[i % fileArr.length]);
        }
        Assertions.assertThat(createLocalRecoveryConfig.isLocalRecoveryEnabled()).isEqualTo(localRecoveryConfig.isLocalRecoveryEnabled());
    }

    @Test
    void testStateRetrievingWithFinishedOperator() {
        Assertions.assertThat(new TaskStateManagerImpl(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), new TestTaskLocalStateStore(), (FileMergingSnapshotManager) null, (StateChangelogStorage) null, new TaskExecutorStateChangelogStoragesManager(), new JobManagerTaskRestore(2L, TaskStateSnapshot.FINISHED_ON_RESTORE), new TestCheckpointResponder()).isTaskDeployedAsFinished()).isTrue();
    }

    void testAcquringRestoreCheckpointId() {
        Assertions.assertThat(new TaskStateManagerImpl(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), new TestTaskLocalStateStore(), (FileMergingSnapshotManager) null, (StateChangelogStorage) null, new TaskExecutorStateChangelogStoragesManager(), (JobManagerTaskRestore) null, new TestCheckpointResponder()).getRestoreCheckpointId()).isNotPresent();
        Assertions.assertThat(new TaskStateManagerImpl(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), new TestTaskLocalStateStore(), (FileMergingSnapshotManager) null, (StateChangelogStorage) null, new TaskExecutorStateChangelogStoragesManager(), new JobManagerTaskRestore(2L, new TaskStateSnapshot()), new TestCheckpointResponder()).getRestoreCheckpointId()).hasValue(2L);
    }

    private static TaskStateManager taskStateManager(JobID jobID, ExecutionAttemptID executionAttemptID, CheckpointResponder checkpointResponder, JobManagerTaskRestore jobManagerTaskRestore, TaskLocalStateStore taskLocalStateStore, StateChangelogStorage<?> stateChangelogStorage) {
        return new TaskStateManagerImpl(jobID, executionAttemptID, taskLocalStateStore, (FileMergingSnapshotManager) null, stateChangelogStorage, new TaskExecutorStateChangelogStoragesManager(), jobManagerTaskRestore, checkpointResponder);
    }
}
