package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.OptionalLong;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.class */
class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage<CommT>> implements OneInputStreamOperator<CommittableMessage<CommT>, CommittableMessage<CommT>>, BoundedOneInput {
    private static final long RETRY_DELAY = 1000;
    private final SimpleVersionedSerializer<CommT> committableSerializer;
    private final FunctionWithException<CommitterInitContext, Committer<CommT>, IOException> committerSupplier;
    private final boolean emitDownstream;
    private final boolean isBatchMode;
    private final boolean isCheckpointingEnabled;
    private SinkCommitterMetricGroup metricGroup;
    private Committer<CommT> committer;
    private CommittableCollector<CommT> committableCollector;
    private long lastCompletedCheckpointId;
    private boolean endInput;
    private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor<>("streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
    private ListState<CommittableCollector<CommT>> committableCollectorState;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/CommitterOperator$CommitterInitContextImp.class */
    public static class CommitterInitContextImp extends InitContextBase implements CommitterInitContext {
        private final SinkCommitterMetricGroup metricGroup;

        public CommitterInitContextImp(StreamingRuntimeContext streamingRuntimeContext, SinkCommitterMetricGroup sinkCommitterMetricGroup, OptionalLong optionalLong) {
            super(streamingRuntimeContext, optionalLong);
            this.metricGroup = (SinkCommitterMetricGroup) Preconditions.checkNotNull(sinkCommitterMetricGroup);
        }

        public SinkCommitterMetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    public CommitterOperator(StreamOperatorParameters<CommittableMessage<CommT>> streamOperatorParameters, ProcessingTimeService processingTimeService, SimpleVersionedSerializer<CommT> simpleVersionedSerializer, FunctionWithException<CommitterInitContext, Committer<CommT>, IOException> functionWithException, boolean z, boolean z2, boolean z3) {
        super(streamOperatorParameters);
        this.lastCompletedCheckpointId = -1L;
        this.endInput = false;
        this.emitDownstream = z;
        this.isBatchMode = z2;
        this.isCheckpointingEnabled = z3;
        this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
        this.committableSerializer = (SimpleVersionedSerializer) Preconditions.checkNotNull(simpleVersionedSerializer);
        this.committerSupplier = (FunctionWithException) Preconditions.checkNotNull(functionWithException);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<CommittableMessage<CommT>>> output) {
        super.setup(streamTask, streamConfig, output);
        this.metricGroup = InternalSinkCommitterMetricGroup.wrap(getMetricGroup());
        this.committableCollector = CommittableCollector.of(this.metricGroup);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        OptionalLong restoredCheckpointId = stateInitializationContext.getRestoredCheckpointId();
        this.committer = (Committer) this.committerSupplier.apply(createInitContext(restoredCheckpointId));
        this.committableCollectorState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC), new CommittableCollectorSerializer(this.committableSerializer, getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(), this.metricGroup));
        if (stateInitializationContext.isRestored()) {
            ((Iterable) this.committableCollectorState.get()).forEach(committableCollector -> {
                this.committableCollector.merge(committableCollector);
            });
            this.lastCompletedCheckpointId = restoredCheckpointId.getAsLong();
            commitAndEmitCheckpoints();
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.committableCollectorState.update(Collections.singletonList(this.committableCollector.copy()));
    }

    @Override // org.apache.flink.streaming.api.operators.BoundedOneInput
    public void endInput() throws Exception {
        this.endInput = true;
        if (!this.isCheckpointingEnabled || this.isBatchMode) {
            commitAndEmitCheckpoints();
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public void notifyCheckpointComplete(long j) throws Exception {
        super.notifyCheckpointComplete(j);
        this.lastCompletedCheckpointId = Math.max(this.lastCompletedCheckpointId, j);
        commitAndEmitCheckpoints();
    }

    private void commitAndEmitCheckpoints() throws IOException, InterruptedException {
        long j = this.endInput ? Long.MAX_VALUE : this.lastCompletedCheckpointId;
        do {
            Iterator<? extends CheckpointCommittableManager<CommT>> it = this.committableCollector.getCheckpointCommittablesUpTo(j).iterator();
            while (it.hasNext()) {
                commitAndEmit(it.next());
            }
            if (this.committableCollector.isFinished()) {
                break;
            }
        } while (this.endInput);
        if (!this.committableCollector.isFinished()) {
            retryWithDelay();
        }
        this.committableCollector.compact();
    }

    private void commitAndEmit(CheckpointCommittableManager<CommT> checkpointCommittableManager) throws IOException, InterruptedException {
        Collection<CommittableWithLineage<CommT>> commit = checkpointCommittableManager.commit(this.committer);
        if (!this.emitDownstream || commit.isEmpty()) {
            return;
        }
        int indexOfThisSubtask = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
        this.output.collect(new StreamRecord(checkpointCommittableManager.getSummary(indexOfThisSubtask, getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks())));
        Iterator<CommittableWithLineage<CommT>> it = commit.iterator();
        while (it.hasNext()) {
            this.output.collect(new StreamRecord(it.next().withSubtaskId(indexOfThisSubtask)));
        }
    }

    private void retryWithDelay() {
        this.processingTimeService.registerTimer(this.processingTimeService.getCurrentProcessingTime() + RETRY_DELAY, j -> {
            commitAndEmitCheckpoints();
        });
    }

    @Override // org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<CommittableMessage<CommT>> streamRecord) throws Exception {
        this.committableCollector.addMessage(streamRecord.getValue());
        if (streamRecord.getValue().getCheckpointIdOrEOI() <= this.lastCompletedCheckpointId) {
            commitAndEmitCheckpoints();
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        IOUtils.closeAll(new AutoCloseable[]{this.committer, () -> {
            super.close();
        }});
    }

    private CommitterInitContext createInitContext(OptionalLong optionalLong) {
        return new CommitterInitContextImp(getRuntimeContext(), this.metricGroup, optionalLong);
    }
}
