/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.filemerging;

import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filemerging.EmptySegmentFileStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.util.Preconditions;

public class SubtaskFileMergingManagerRestoreOperation {
    private final long checkpointId;
    private final JobID jobID;
    private final TaskInfo taskInfo;
    private final JobVertexID vertexID;
    private final FileMergingSnapshotManager fileMergingSnapshotManager;
    private final OperatorSubtaskState subtaskState;

    public SubtaskFileMergingManagerRestoreOperation(long checkpointId, FileMergingSnapshotManager fileMergingSnapshotManager, JobID jobID, TaskInfo taskInfo, JobVertexID vertexID, OperatorSubtaskState subtaskState) {
        this.checkpointId = checkpointId;
        this.fileMergingSnapshotManager = fileMergingSnapshotManager;
        this.jobID = jobID;
        this.taskInfo = Preconditions.checkNotNull(taskInfo);
        this.vertexID = Preconditions.checkNotNull(vertexID);
        this.subtaskState = Preconditions.checkNotNull(subtaskState);
    }

    public void restore() {
        FileMergingSnapshotManager.SubtaskKey subtaskKey = new FileMergingSnapshotManager.SubtaskKey(this.jobID, this.vertexID, this.taskInfo);
        Stream keyedStateHandles = Stream.concat(this.subtaskState.getManagedKeyedState().stream(), this.subtaskState.getRawKeyedState().stream()).flatMap(this::getChildrenStreamHandles);
        Stream operatorStateHandles = Stream.concat(this.subtaskState.getManagedOperatorState().stream(), this.subtaskState.getRawOperatorState().stream()).flatMap(this::getChildrenStreamHandles);
        Stream<SegmentFileStateHandle> segmentStateHandles = Stream.of(keyedStateHandles, operatorStateHandles).flatMap(Function.identity()).filter(handle -> handle instanceof SegmentFileStateHandle && !(handle instanceof EmptySegmentFileStateHandle)).map(handle -> (SegmentFileStateHandle)handle);
        this.fileMergingSnapshotManager.restoreStateHandles(this.checkpointId, subtaskKey, segmentStateHandles);
    }

    private Stream<? extends StateObject> getChildrenStreamHandles(KeyedStateHandle parentHandle) {
        if (parentHandle instanceof IncrementalRemoteKeyedStateHandle) {
            return ((IncrementalRemoteKeyedStateHandle)parentHandle).streamSubHandles();
        }
        if (parentHandle instanceof KeyGroupsStateHandle) {
            return Stream.of(((KeyGroupsStateHandle)parentHandle).getDelegateStateHandle());
        }
        return Stream.of(parentHandle);
    }

    private Stream<StreamStateHandle> getChildrenStreamHandles(OperatorStateHandle parentHandle) {
        return Stream.of(parentHandle.getDelegateStateHandle());
    }
}

