package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.class */
public class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableManager<CommT> {
    private final Map<Integer, SubtaskCommittableManager<CommT>> subtasksCommittableManagers;
    private final long checkpointId;
    private final int numberOfSubtasks;
    private final SinkCommitterMetricGroup metricGroup;
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointCommittableManagerImpl.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public CheckpointCommittableManagerImpl(Map<Integer, SubtaskCommittableManager<CommT>> map, int i, long j, SinkCommitterMetricGroup sinkCommitterMetricGroup) {
        this.subtasksCommittableManagers = (Map) Preconditions.checkNotNull(map);
        this.numberOfSubtasks = i;
        this.checkpointId = j;
        this.metricGroup = sinkCommitterMetricGroup;
    }

    public static <CommT> CheckpointCommittableManagerImpl<CommT> forSummary(CommittableSummary<CommT> committableSummary, SinkCommitterMetricGroup sinkCommitterMetricGroup) {
        return new CheckpointCommittableManagerImpl<>(new HashMap(), committableSummary.getNumberOfSubtasks(), committableSummary.getCheckpointIdOrEOI(), sinkCommitterMetricGroup);
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager
    public long getCheckpointId() {
        return this.checkpointId;
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager
    public int getNumberOfSubtasks() {
        return this.numberOfSubtasks;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<SubtaskCommittableManager<CommT>> getSubtaskCommittableManagers() {
        return this.subtasksCommittableManagers.values();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addSummary(CommittableSummary<CommT> committableSummary) {
        long checkpointIdOrEOI = committableSummary.getCheckpointIdOrEOI();
        SubtaskCommittableManager<CommT> subtaskCommittableManager = new SubtaskCommittableManager<>(committableSummary.getNumberOfCommittables(), committableSummary.getSubtaskId(), checkpointIdOrEOI, this.metricGroup);
        if (checkpointIdOrEOI == Long.MAX_VALUE) {
            LOG.debug("Adding EOI summary (new={}}, merged={}}).", subtaskCommittableManager, this.subtasksCommittableManagers.merge(Integer.valueOf(committableSummary.getSubtaskId()), subtaskCommittableManager, (v0, v1) -> {
                return v0.merge(v1);
            }));
            return;
        }
        SubtaskCommittableManager<CommT> putIfAbsent = this.subtasksCommittableManagers.putIfAbsent(Integer.valueOf(committableSummary.getSubtaskId()), subtaskCommittableManager);
        if (putIfAbsent != null) {
            throw new UnsupportedOperationException(String.format("Received duplicate committable summary for checkpoint %s + subtask %s (new=%s, old=%s). Please check the status of FLINK-25920", Long.valueOf(checkpointIdOrEOI), Integer.valueOf(committableSummary.getSubtaskId()), subtaskCommittableManager, putIfAbsent));
        }
        LOG.debug("Setting the summary for checkpointId {} with {}", Long.valueOf(this.checkpointId), subtaskCommittableManager);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addCommittable(CommittableWithLineage<CommT> committableWithLineage) {
        getSubtaskCommittableManager(committableWithLineage.getSubtaskId()).add((CommittableWithLineage) committableWithLineage);
    }

    SubtaskCommittableManager<CommT> getSubtaskCommittableManager(int i) {
        return (SubtaskCommittableManager) Preconditions.checkNotNull(this.subtasksCommittableManagers.get(Integer.valueOf(i)), "Unknown subtask for %s", new Object[]{Integer.valueOf(i)});
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager
    public CommittableSummary<CommT> getSummary(int i, int i2) {
        return new CommittableSummary<>(i, i2, this.checkpointId, this.subtasksCommittableManagers.values().stream().mapToInt((v0) -> {
            return v0.getNumCommittables();
        }).sum(), this.subtasksCommittableManagers.values().stream().mapToInt((v0) -> {
            return v0.getNumPending();
        }).sum(), this.subtasksCommittableManagers.values().stream().mapToInt((v0) -> {
            return v0.getNumFailed();
        }).sum());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isFinished() {
        return this.subtasksCommittableManagers.values().stream().allMatch((v0) -> {
            return v0.isFinished();
        });
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager
    public Collection<CommittableWithLineage<CommT>> commit(Committer<CommT> committer) throws IOException, InterruptedException {
        Collection<CommitRequestImpl<CommT>> pendingRequests = getPendingRequests(true);
        pendingRequests.forEach((v0) -> {
            v0.setSelected();
        });
        committer.commit(new ArrayList(pendingRequests));
        pendingRequests.forEach((v0) -> {
            v0.setCommittedIfNoError();
        });
        Collection<CommittableWithLineage<CommT>> drainFinished = drainFinished();
        this.metricGroup.setCurrentPendingCommittablesGauge(() -> {
            return Integer.valueOf(getPendingRequests(false).size());
        });
        return drainFinished;
    }

    Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean z) {
        return (Collection) this.subtasksCommittableManagers.values().stream().filter(subtaskCommittableManager -> {
            return !z || subtaskCommittableManager.hasReceivedAll();
        }).flatMap((v0) -> {
            return v0.getPendingRequests();
        }).collect(Collectors.toList());
    }

    Collection<CommittableWithLineage<CommT>> drainFinished() {
        return (Collection) this.subtasksCommittableManagers.values().stream().flatMap(subtaskCommittableManager -> {
            return subtaskCommittableManager.drainCommitted().stream();
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CheckpointCommittableManagerImpl<CommT> merge(CheckpointCommittableManagerImpl<CommT> checkpointCommittableManagerImpl) {
        Preconditions.checkArgument(checkpointCommittableManagerImpl.checkpointId == this.checkpointId);
        CheckpointCommittableManagerImpl<CommT> copy = copy();
        for (Map.Entry<Integer, SubtaskCommittableManager<CommT>> entry : checkpointCommittableManagerImpl.subtasksCommittableManagers.entrySet()) {
            copy.subtasksCommittableManagers.merge(entry.getKey(), entry.getValue(), (v0, v1) -> {
                return v0.merge(v1);
            });
        }
        return copy;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CheckpointCommittableManagerImpl<CommT> copy() {
        return new CheckpointCommittableManagerImpl<>((Map) this.subtasksCommittableManagers.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return ((SubtaskCommittableManager) entry.getValue()).copy();
        })), this.numberOfSubtasks, this.checkpointId, this.metricGroup);
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        CheckpointCommittableManagerImpl checkpointCommittableManagerImpl = (CheckpointCommittableManagerImpl) obj;
        return this.checkpointId == checkpointCommittableManagerImpl.checkpointId && this.numberOfSubtasks == checkpointCommittableManagerImpl.numberOfSubtasks && Objects.equals(this.subtasksCommittableManagers, checkpointCommittableManagerImpl.subtasksCommittableManagers);
    }

    public int hashCode() {
        return Objects.hash(this.subtasksCommittableManagers, Long.valueOf(this.checkpointId), Integer.valueOf(this.numberOfSubtasks));
    }

    public String toString() {
        int i = this.numberOfSubtasks;
        long j = this.checkpointId;
        Map<Integer, SubtaskCommittableManager<CommT>> map = this.subtasksCommittableManagers;
        return "CheckpointCommittableManagerImpl{numberOfSubtasks=" + i + ", checkpointId=" + j + ", subtasksCommittableManagers=" + i + "}";
    }
}
