package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.util.Preconditions;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.class */
public class SubtaskCommittableManager<CommT> {
    private final Deque<CommitRequestImpl<CommT>> requests;
    private int numExpectedCommittables;

    @Nullable
    private final Long checkpointId;
    private final int subtaskId;
    private int numDrained;
    private int numFailed;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubtaskCommittableManager(int i, int i2, @Nullable Long l) {
        this(Collections.emptyList(), i, 0, 0, i2, l);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubtaskCommittableManager(Collection<CommitRequestImpl<CommT>> collection, int i, int i2, int i3, int i4, @Nullable Long l) {
        this.checkpointId = l;
        this.subtaskId = i4;
        this.numExpectedCommittables = i;
        this.requests = new ArrayDeque((Collection) Preconditions.checkNotNull(collection));
        this.numDrained = i2;
        this.numFailed = i3;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void add(CommittableWithLineage<CommT> committableWithLineage) {
        add((SubtaskCommittableManager<CommT>) committableWithLineage.getCommittable());
    }

    void add(CommT commt) {
        Preconditions.checkState(this.requests.size() < this.numExpectedCommittables, "Already received all committables.");
        this.requests.add(new CommitRequestImpl<>(commt));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasReceivedAll() {
        return getNumCommittables() == this.numExpectedCommittables;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumCommittables() {
        return this.requests.size() + this.numDrained + this.numFailed;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumPending() {
        return this.numExpectedCommittables - (this.numDrained + this.numFailed);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumFailed() {
        return this.numFailed;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isFinished() {
        return getNumPending() == 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Stream<CommitRequestImpl<CommT>> getPendingRequests() {
        return this.requests.stream().filter(commitRequestImpl -> {
            return !commitRequestImpl.isFinished();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<CommittableWithLineage<CommT>> drainCommitted() {
        ArrayList arrayList = new ArrayList(this.requests.size());
        Iterator<CommitRequestImpl<CommT>> it = this.requests.iterator();
        while (it.hasNext()) {
            CommitRequestImpl<CommT> next = it.next();
            if (next.isFinished()) {
                if (next.getState() == CommitRequestState.FAILED) {
                    this.numFailed++;
                    it.remove();
                } else {
                    arrayList.add(new CommittableWithLineage(next.getCommittable(), this.checkpointId, this.subtaskId));
                    it.remove();
                }
            }
        }
        this.numDrained += arrayList.size();
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumDrained() {
        return this.numDrained;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getSubtaskId() {
        return this.subtaskId;
    }

    @VisibleForTesting
    @Nullable
    Long getCheckpointId() {
        return this.checkpointId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Deque<CommitRequestImpl<CommT>> getRequests() {
        return this.requests;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubtaskCommittableManager<CommT> merge(SubtaskCommittableManager<CommT> subtaskCommittableManager) {
        Preconditions.checkArgument(subtaskCommittableManager.getSubtaskId() == getSubtaskId());
        this.numExpectedCommittables += subtaskCommittableManager.numExpectedCommittables;
        this.requests.addAll(subtaskCommittableManager.requests);
        this.numDrained += subtaskCommittableManager.numDrained;
        this.numFailed += subtaskCommittableManager.numFailed;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubtaskCommittableManager<CommT> copy() {
        return new SubtaskCommittableManager<>((Collection) this.requests.stream().map((v0) -> {
            return v0.copy();
        }).collect(Collectors.toList()), this.numExpectedCommittables, this.numDrained, this.numFailed, this.subtaskId, this.checkpointId);
    }
}
