package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.state.AbstractChannelStateHandle;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStatePendingResult.class */
public class ChannelStatePendingResult {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelStatePendingResult.class);
    private final int subtaskIndex;
    private final long checkpointId;
    private final ChannelStateSerializer serializer;
    private final ChannelStateWriter.ChannelStateWriteResult result;
    private final Map<InputChannelInfo, AbstractChannelStateHandle.StateContentMetaInfo> inputChannelOffsets = new HashMap();
    private final Map<ResultSubpartitionInfo, AbstractChannelStateHandle.StateContentMetaInfo> resultSubpartitionOffsets = new HashMap();
    private boolean allInputsReceived = false;
    private boolean allOutputsReceived = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStatePendingResult$HandleFactory.class */
    public interface HandleFactory<I, H extends AbstractChannelStateHandle<I>> {
        public static final HandleFactory<InputChannelInfo, InputChannelStateHandle> INPUT_CHANNEL = InputChannelStateHandle::new;
        public static final HandleFactory<ResultSubpartitionInfo, ResultSubpartitionStateHandle> RESULT_SUBPARTITION = ResultSubpartitionStateHandle::new;

        H create(int i, I i2, StreamStateHandle streamStateHandle, List<Long> list, long j);
    }

    public ChannelStatePendingResult(int i, long j, ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, ChannelStateSerializer channelStateSerializer) {
        this.subtaskIndex = i;
        this.checkpointId = j;
        this.result = channelStateWriteResult;
        this.serializer = channelStateSerializer;
    }

    public boolean isAllInputsReceived() {
        return this.allInputsReceived;
    }

    public boolean isAllOutputsReceived() {
        return this.allOutputsReceived;
    }

    public Map<InputChannelInfo, AbstractChannelStateHandle.StateContentMetaInfo> getInputChannelOffsets() {
        return this.inputChannelOffsets;
    }

    public Map<ResultSubpartitionInfo, AbstractChannelStateHandle.StateContentMetaInfo> getResultSubpartitionOffsets() {
        return this.resultSubpartitionOffsets;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completeInput() {
        LOG.debug("complete input, output completed: {}", Boolean.valueOf(this.allOutputsReceived));
        Preconditions.checkArgument(!this.allInputsReceived);
        this.allInputsReceived = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completeOutput() {
        LOG.debug("complete output, input completed: {}", Boolean.valueOf(this.allInputsReceived));
        Preconditions.checkArgument(!this.allOutputsReceived);
        this.allOutputsReceived = true;
    }

    public void finishResult(@Nullable StreamStateHandle streamStateHandle) throws IOException {
        Preconditions.checkState(streamStateHandle != null || (this.inputChannelOffsets.isEmpty() && this.resultSubpartitionOffsets.isEmpty()), "The stateHandle just can be null when no data is written.");
        complete(streamStateHandle, this.result.inputChannelStateHandles, this.inputChannelOffsets, HandleFactory.INPUT_CHANNEL);
        complete(streamStateHandle, this.result.resultSubpartitionStateHandles, this.resultSubpartitionOffsets, HandleFactory.RESULT_SUBPARTITION);
    }

    private <I, H extends AbstractChannelStateHandle<I>> void complete(StreamStateHandle streamStateHandle, CompletableFuture<Collection<H>> completableFuture, Map<I, AbstractChannelStateHandle.StateContentMetaInfo> map, HandleFactory<I, H> handleFactory) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<I, AbstractChannelStateHandle.StateContentMetaInfo> entry : map.entrySet()) {
            arrayList.add(createHandle(handleFactory, streamStateHandle, entry.getKey(), entry.getValue()));
        }
        completableFuture.complete(arrayList);
        LOG.debug("channel state write completed, checkpointId: {}, handles: {}", Long.valueOf(this.checkpointId), arrayList);
    }

    private <I, H extends AbstractChannelStateHandle<I>> H createHandle(HandleFactory<I, H> handleFactory, StreamStateHandle streamStateHandle, I i, AbstractChannelStateHandle.StateContentMetaInfo stateContentMetaInfo) throws IOException {
        Optional<byte[]> asBytesIfInMemory = streamStateHandle.asBytesIfInMemory();
        if (!asBytesIfInMemory.isPresent()) {
            return handleFactory.create(this.subtaskIndex, i, streamStateHandle, stateContentMetaInfo.getOffsets(), stateContentMetaInfo.getSize());
        }
        ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(UUID.randomUUID().toString(), this.serializer.extractAndMerge(asBytesIfInMemory.get(), stateContentMetaInfo.getOffsets()));
        return handleFactory.create(this.subtaskIndex, i, byteStreamStateHandle, Collections.singletonList(Long.valueOf(this.serializer.getHeaderLength())), byteStreamStateHandle.getStateSize());
    }

    public void fail(Throwable th) {
        this.result.fail(th);
    }

    public boolean isDone() {
        return this.result.isDone();
    }
}
