/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.AllTieredShuffleMasterSnapshots;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.ShuffleDescriptorRetriever;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredInternalShuffleMasterSnapshot;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredShuffleMasterSnapshot;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleHandler;
import org.apache.flink.runtime.shuffle.DefaultPartitionWithMetrics;
import org.apache.flink.runtime.shuffle.DefaultShuffleMetrics;
import org.apache.flink.runtime.shuffle.JobShuffleContext;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMasterSnapshotContext;
import org.apache.flink.runtime.shuffle.ShuffleMetrics;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;

public class TieredStorageMasterClient {
    private final List<Tuple2<String, TierMasterAgent>> tiers;
    private final Map<String, TierMasterAgent> tierMasterAgentMap;
    private final boolean allPartitionInRemote;
    private final ShuffleDescriptorRetriever shuffleDescriptorRetriever;

    public TieredStorageMasterClient(List<Tuple2<String, TierMasterAgent>> tiers, ShuffleDescriptorRetriever shuffleDescriptorRetriever) {
        this.tiers = tiers;
        this.allPartitionInRemote = tiers.stream().allMatch(tier -> ((TierMasterAgent)tier.f1).partitionInRemote());
        this.tierMasterAgentMap = new HashMap<String, TierMasterAgent>();
        for (Tuple2<String, TierMasterAgent> tier2 : tiers) {
            this.tierMasterAgentMap.put((String)tier2.f0, (TierMasterAgent)tier2.f1);
        }
        this.shuffleDescriptorRetriever = shuffleDescriptorRetriever;
    }

    public void registerJob(JobID jobID, TierShuffleHandler shuffleHandler) {
        this.tiers.forEach(tierMasterAgent -> ((TierMasterAgent)tierMasterAgent.f1).registerJob(jobID, shuffleHandler));
    }

    public void unregisterJob(JobID jobID) {
        this.tiers.forEach(tierMasterAgent -> ((TierMasterAgent)tierMasterAgent.f1).unregisterJob(jobID));
    }

    public List<TierShuffleDescriptor> addPartitionAndGetShuffleDescriptor(JobID jobID, int numSubpartitions, ResultPartitionID resultPartitionID) {
        return this.tiers.stream().map(tierMasterAgent -> ((TierMasterAgent)tierMasterAgent.f1).addPartitionAndGetShuffleDescriptor(jobID, numSubpartitions, resultPartitionID)).collect(Collectors.toList());
    }

    public void releasePartition(ShuffleDescriptor shuffleDescriptor) {
        Preconditions.checkState(shuffleDescriptor instanceof NettyShuffleDescriptor);
        List<TierShuffleDescriptor> tierShuffleDescriptors = ((NettyShuffleDescriptor)shuffleDescriptor).getTierShuffleDescriptors();
        if (tierShuffleDescriptors != null && !tierShuffleDescriptors.isEmpty()) {
            Preconditions.checkState(tierShuffleDescriptors.size() == this.tiers.size());
            for (int i = 0; i < tierShuffleDescriptors.size(); ++i) {
                ((TierMasterAgent)this.tiers.get((int)i).f1).releasePartition(tierShuffleDescriptors.get(i));
            }
        }
    }

    public void snapshotState(CompletableFuture<AllTieredShuffleMasterSnapshots> snapshotFuture, ShuffleMasterSnapshotContext context, JobID jobId) {
        this.snapshotStateInternal(snapshotFuture, (agent, future) -> agent.snapshotState((CompletableFuture<TieredShuffleMasterSnapshot>)future, context, jobId));
    }

    public void snapshotState(CompletableFuture<AllTieredShuffleMasterSnapshots> snapshotFuture) {
        this.snapshotStateInternal(snapshotFuture, TierMasterAgent::snapshotState);
    }

    private void snapshotStateInternal(CompletableFuture<AllTieredShuffleMasterSnapshots> snapshotFuture, BiConsumer<TierMasterAgent, CompletableFuture<TieredShuffleMasterSnapshot>> masterAgentConsumer) {
        ArrayList<CompletionStage> futures = new ArrayList<CompletionStage>(this.tiers.size());
        for (Tuple2<String, TierMasterAgent> tier : this.tiers) {
            CompletableFuture future = new CompletableFuture();
            futures.add(future.thenApply(snap -> Tuple2.of((String)tier.f0, snap)));
            masterAgentConsumer.accept((TierMasterAgent)tier.f1, future);
        }
        FutureUtils.combineAll(futures).thenAccept(snapshotWithIdentifiers -> snapshotFuture.complete(new AllTieredShuffleMasterSnapshots((Collection<Tuple2<String, TieredShuffleMasterSnapshot>>)snapshotWithIdentifiers)));
    }

    public void restoreState(TieredInternalShuffleMasterSnapshot clusterSnapshot) {
        Preconditions.checkState(clusterSnapshot != null);
        AllTieredShuffleMasterSnapshots allTierSnapshots = clusterSnapshot.getAllTierSnapshots();
        Collection<Tuple2<String, TieredShuffleMasterSnapshot>> snapshots = allTierSnapshots.getSnapshots();
        for (Tuple2<String, TieredShuffleMasterSnapshot> identifierWithSnap : snapshots) {
            String identifier = (String)identifierWithSnap.f0;
            this.tierMasterAgentMap.get(identifier).restoreState((TieredShuffleMasterSnapshot)identifierWithSnap.f1);
        }
    }

    public void restoreState(List<TieredInternalShuffleMasterSnapshot> snapshots, JobID jobId) {
        for (TieredInternalShuffleMasterSnapshot internalSnapshot : snapshots) {
            Preconditions.checkState(internalSnapshot != null);
            AllTieredShuffleMasterSnapshots allTierSnapshots = internalSnapshot.getAllTierSnapshots();
            Collection<Tuple2<String, TieredShuffleMasterSnapshot>> tierSnapshots = allTierSnapshots.getSnapshots();
            for (Tuple2<String, TieredShuffleMasterSnapshot> identifierWithSnap : tierSnapshots) {
                String identifier = (String)identifierWithSnap.f0;
                this.tierMasterAgentMap.get(identifier).restoreState((TieredShuffleMasterSnapshot)identifierWithSnap.f1, jobId);
            }
        }
    }

    public CompletableFuture<Collection<PartitionWithMetrics>> getPartitionWithMetrics(JobShuffleContext jobShuffleContext, Duration timeout, Set<ResultPartitionID> expectedPartitions) {
        JobID jobId = jobShuffleContext.getJobId();
        if (!this.allPartitionInRemote) {
            return jobShuffleContext.getPartitionWithMetrics(timeout, expectedPartitions);
        }
        ArrayList<CompletableFuture<Map<ResultPartitionID, ShuffleMetrics>>> futures = new ArrayList<CompletableFuture<Map<ResultPartitionID, ShuffleMetrics>>>(this.tiers.size());
        for (Tuple2<String, TierMasterAgent> tier : this.tiers) {
            CompletableFuture<Map<ResultPartitionID, ShuffleMetrics>> tierPartitionMapFuture = ((TierMasterAgent)tier.f1).getPartitionWithMetrics(jobId, timeout, expectedPartitions);
            futures.add(tierPartitionMapFuture);
        }
        return FutureUtils.combineAll(futures).thenApply(allPartitions -> {
            int tierNums = allPartitions.size();
            ArrayList result = new ArrayList();
            expectedPartitions.forEach(partitionId -> {
                Map partitionMap;
                ShuffleMetrics shuffleMetrics;
                ArrayList<ResultPartitionBytes> partitionBytes = new ArrayList<ResultPartitionBytes>();
                Iterator iterator = allPartitions.iterator();
                while (iterator.hasNext() && (shuffleMetrics = (ShuffleMetrics)(partitionMap = (Map)iterator.next()).get(partitionId)) != null) {
                    partitionBytes.add(shuffleMetrics.getPartitionBytes());
                }
                if (partitionBytes.size() == tierNums) {
                    Optional<ShuffleDescriptor> shuffleDescriptor = this.shuffleDescriptorRetriever.getShuffleDescriptor(jobId, (ResultPartitionID)partitionId);
                    shuffleDescriptor.ifPresent(descriptor -> result.add(new DefaultPartitionWithMetrics((ShuffleDescriptor)descriptor, new DefaultShuffleMetrics(ResultPartitionBytes.mergeAll(partitionBytes)))));
                }
            });
            return result;
        });
    }

    public void close() {
        this.tiers.forEach(tier -> ((TierMasterAgent)tier.f1).close());
    }
}

