package org.apache.flink.runtime.checkpoint.channel;

import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.AbstractChannelStateHandle;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@NotThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.class */
public class ChannelStateCheckpointWriter {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelStateCheckpointWriter.class);
    private final DataOutputStream dataStream;
    private final CheckpointStateOutputStream checkpointStream;
    private Throwable throwable;
    private final ChannelStateSerializer serializer;
    private final long checkpointId;
    private final RunnableWithException onComplete;
    private final Set<SubtaskID> subtasksToRegister;
    private final Map<SubtaskID, ChannelStatePendingResult> pendingResults;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelStateCheckpointWriter(Set<SubtaskID> set, long j, CheckpointStreamFactory checkpointStreamFactory, ChannelStateSerializer channelStateSerializer, RunnableWithException runnableWithException) throws Exception {
        this(set, j, checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE), channelStateSerializer, runnableWithException);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @VisibleForTesting
    ChannelStateCheckpointWriter(Set<SubtaskID> set, long j, CheckpointStateOutputStream checkpointStateOutputStream, ChannelStateSerializer channelStateSerializer, RunnableWithException runnableWithException) {
        this(set, j, channelStateSerializer, runnableWithException, checkpointStateOutputStream, new DataOutputStream(checkpointStateOutputStream));
    }

    @VisibleForTesting
    ChannelStateCheckpointWriter(Set<SubtaskID> set, long j, ChannelStateSerializer channelStateSerializer, RunnableWithException runnableWithException, CheckpointStateOutputStream checkpointStateOutputStream, DataOutputStream dataOutputStream) {
        this.pendingResults = new HashMap();
        Preconditions.checkArgument(!set.isEmpty(), "The subtasks cannot be empty.");
        this.subtasksToRegister = new HashSet(set);
        this.checkpointId = j;
        this.checkpointStream = (CheckpointStateOutputStream) Preconditions.checkNotNull(checkpointStateOutputStream);
        this.serializer = (ChannelStateSerializer) Preconditions.checkNotNull(channelStateSerializer);
        this.dataStream = (DataOutputStream) Preconditions.checkNotNull(dataOutputStream);
        this.onComplete = (RunnableWithException) Preconditions.checkNotNull(runnableWithException);
        runWithChecks(() -> {
            channelStateSerializer.writeHeader(dataOutputStream);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerSubtaskResult(SubtaskID subtaskID, ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult) {
        Preconditions.checkState(!isDone(), "The write is done.");
        Preconditions.checkState(!this.pendingResults.containsKey(subtaskID), "The subtask %s has already been register before.", new Object[]{subtaskID});
        this.subtasksToRegister.remove(subtaskID);
        this.pendingResults.put(subtaskID, new ChannelStatePendingResult(subtaskID.getSubtaskIndex(), this.checkpointId, channelStateWriteResult, this.serializer));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseSubtask(SubtaskID subtaskID) throws Exception {
        if (this.subtasksToRegister.remove(subtaskID)) {
            tryFinishResult();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void writeInput(JobVertexID jobVertexID, int i, InputChannelInfo inputChannelInfo, Buffer buffer) {
        try {
            if (isDone()) {
                return;
            }
            ChannelStatePendingResult channelStatePendingResult = getChannelStatePendingResult(jobVertexID, i);
            write(channelStatePendingResult.getInputChannelOffsets(), inputChannelInfo, buffer, !channelStatePendingResult.isAllInputsReceived(), "ChannelStateCheckpointWriter#writeInput");
            buffer.recycleBuffer();
        } finally {
            buffer.recycleBuffer();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void writeOutput(JobVertexID jobVertexID, int i, ResultSubpartitionInfo resultSubpartitionInfo, Buffer buffer) {
        try {
            if (isDone()) {
                return;
            }
            ChannelStatePendingResult channelStatePendingResult = getChannelStatePendingResult(jobVertexID, i);
            write(channelStatePendingResult.getResultSubpartitionOffsets(), resultSubpartitionInfo, buffer, !channelStatePendingResult.isAllOutputsReceived(), "ChannelStateCheckpointWriter#writeOutput");
            buffer.recycleBuffer();
        } finally {
            buffer.recycleBuffer();
        }
    }

    private <K> void write(Map<K, AbstractChannelStateHandle.StateContentMetaInfo> map, K k, Buffer buffer, boolean z, String str) {
        runWithChecks(() -> {
            Preconditions.checkState(z);
            long pos = this.checkpointStream.getPos();
            Closeable measureIO = NetworkActionsLogger.measureIO(str, buffer);
            Throwable th = null;
            try {
                try {
                    this.serializer.writeData(this.dataStream, buffer);
                    if (measureIO != null) {
                        if (0 != 0) {
                            try {
                                measureIO.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            measureIO.close();
                        }
                    }
                    ((AbstractChannelStateHandle.StateContentMetaInfo) map.computeIfAbsent(k, obj -> {
                        return new AbstractChannelStateHandle.StateContentMetaInfo();
                    })).withDataAdded(pos, this.checkpointStream.getPos() - pos);
                    NetworkActionsLogger.tracePersist(str, buffer, k, this.checkpointId);
                } finally {
                }
            } catch (Throwable th3) {
                if (measureIO != null) {
                    if (th != null) {
                        try {
                            measureIO.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        measureIO.close();
                    }
                }
                throw th3;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completeInput(JobVertexID jobVertexID, int i) throws Exception {
        if (isDone()) {
            return;
        }
        getChannelStatePendingResult(jobVertexID, i).completeInput();
        tryFinishResult();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completeOutput(JobVertexID jobVertexID, int i) throws Exception {
        if (isDone()) {
            return;
        }
        getChannelStatePendingResult(jobVertexID, i).completeOutput();
        tryFinishResult();
    }

    public void tryFinishResult() throws Exception {
        if (this.subtasksToRegister.isEmpty()) {
            for (ChannelStatePendingResult channelStatePendingResult : this.pendingResults.values()) {
                if (!channelStatePendingResult.isAllInputsReceived() || !channelStatePendingResult.isAllOutputsReceived()) {
                    return;
                }
            }
            if (isDone()) {
                doComplete(this.onComplete);
            } else {
                runWithChecks(() -> {
                    doComplete(this.onComplete, this::finishWriteAndResult);
                });
            }
        }
    }

    private void finishWriteAndResult() throws IOException {
        StreamStateHandle streamStateHandle = null;
        if (this.checkpointStream.getPos() == this.serializer.getHeaderLength()) {
            this.dataStream.close();
        } else {
            this.dataStream.flush();
            streamStateHandle = this.checkpointStream.closeAndGetHandle();
        }
        Iterator<ChannelStatePendingResult> it = this.pendingResults.values().iterator();
        while (it.hasNext()) {
            it.next().finishResult(streamStateHandle);
        }
    }

    private void doComplete(RunnableWithException... runnableWithExceptionArr) throws Exception {
        for (RunnableWithException runnableWithException : runnableWithExceptionArr) {
            runnableWithException.run();
        }
    }

    public boolean isDone() {
        if (this.throwable != null) {
            return true;
        }
        Iterator<ChannelStatePendingResult> it = this.pendingResults.values().iterator();
        while (it.hasNext()) {
            if (it.next().isDone()) {
                return true;
            }
        }
        return false;
    }

    private void runWithChecks(RunnableWithException runnableWithException) {
        try {
            Preconditions.checkState(!isDone(), "results are already completed", new Object[]{this.pendingResults.values()});
            runnableWithException.run();
        } catch (Exception e) {
            fail(e);
            if (ExceptionUtils.findThrowable(e, IOException.class).isPresent()) {
                return;
            }
            ExceptionUtils.rethrow(e);
        }
    }

    public void fail(JobVertexID jobVertexID, int i, Throwable th) {
        if (isDone()) {
            return;
        }
        this.throwable = th;
        ChannelStatePendingResult channelStatePendingResult = this.pendingResults.get(SubtaskID.of(jobVertexID, i));
        if (channelStatePendingResult != null) {
            channelStatePendingResult.fail(th);
        }
        failResultAndCloseStream(new CheckpointException(CheckpointFailureReason.CHANNEL_STATE_SHARED_STREAM_EXCEPTION, th));
    }

    public void fail(Throwable th) {
        if (isDone()) {
            return;
        }
        this.throwable = th;
        failResultAndCloseStream(th);
    }

    public void failResultAndCloseStream(Throwable th) {
        Iterator<ChannelStatePendingResult> it = this.pendingResults.values().iterator();
        while (it.hasNext()) {
            it.next().fail(th);
        }
        try {
            this.checkpointStream.close();
        } catch (Exception e) {
            if (!ExceptionUtils.findThrowable(e, IOException.class).isPresent()) {
                throw new RuntimeException("Unable to close checkpointStream after a failure", e);
            }
            LOG.warn("Unable to close checkpointStream after a failure", e);
        }
    }

    @Nonnull
    private ChannelStatePendingResult getChannelStatePendingResult(JobVertexID jobVertexID, int i) {
        SubtaskID of = SubtaskID.of(jobVertexID, i);
        ChannelStatePendingResult channelStatePendingResult = this.pendingResults.get(of);
        Preconditions.checkNotNull(channelStatePendingResult, "The subtask[%s] is not registered yet", new Object[]{of});
        return channelStatePendingResult;
    }
}
