package com.facebook.presto.execution.scheduler;

import com.facebook.airlift.concurrent.MoreFutures;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.execution.scheduler.FixedSourcePartitionedScheduler;
import com.facebook.presto.execution.scheduler.ScheduleResult;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.connector.NotPartitionedPartitionHandle;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.split.EmptySplit;
import com.facebook.presto.split.SplitSource;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/* loaded from: input_file:com/facebook/presto/execution/scheduler/SourcePartitionedScheduler.class */
public class SourcePartitionedScheduler implements SourceScheduler {
    private final SqlStageExecution stage;
    private final SplitSource splitSource;
    private final SplitPlacementPolicy splitPlacementPolicy;
    private final int splitBatchSize;
    private final PlanNodeId partitionedNode;
    private final boolean groupedExecution;
    private boolean lifespanAdded;
    private final Map<Lifespan, ScheduleGroup> scheduleGroups = new HashMap();
    private State state = State.INITIALIZED;
    private SettableFuture<?> whenFinishedOrNewLifespanAdded = SettableFuture.create();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/execution/scheduler/SourcePartitionedScheduler$ScheduleGroup.class */
    public static class ScheduleGroup {
        public final ConnectorPartitionHandle partitionHandle;
        public ListenableFuture<SplitSource.SplitBatch> nextSplitBatchFuture;
        public ListenableFuture<?> placementFuture = Futures.immediateFuture(null);
        public Set<Split> pendingSplits = new HashSet();
        public ScheduleGroupState state = ScheduleGroupState.INITIALIZED;

        public ScheduleGroup(ConnectorPartitionHandle connectorPartitionHandle) {
            this.partitionHandle = (ConnectorPartitionHandle) Objects.requireNonNull(connectorPartitionHandle, "partitionHandle is null");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/execution/scheduler/SourcePartitionedScheduler$ScheduleGroupState.class */
    public enum ScheduleGroupState {
        INITIALIZED,
        SPLITS_ADDED,
        NO_MORE_SPLITS,
        DONE
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/execution/scheduler/SourcePartitionedScheduler$State.class */
    public enum State {
        INITIALIZED,
        SPLITS_ADDED,
        NO_MORE_SPLITS,
        FINISHED
    }

    private SourcePartitionedScheduler(SqlStageExecution sqlStageExecution, PlanNodeId planNodeId, SplitSource splitSource, SplitPlacementPolicy splitPlacementPolicy, int i, boolean z) {
        this.stage = (SqlStageExecution) Objects.requireNonNull(sqlStageExecution, "stage is null");
        this.partitionedNode = (PlanNodeId) Objects.requireNonNull(planNodeId, "partitionedNode is null");
        this.splitSource = (SplitSource) Objects.requireNonNull(splitSource, "splitSource is null");
        this.splitPlacementPolicy = (SplitPlacementPolicy) Objects.requireNonNull(splitPlacementPolicy, "splitPlacementPolicy is null");
        Preconditions.checkArgument(i > 0, "splitBatchSize must be at least one");
        this.splitBatchSize = i;
        this.groupedExecution = z;
    }

    @Override // com.facebook.presto.execution.scheduler.SourceScheduler
    public PlanNodeId getPlanNodeId() {
        return this.partitionedNode;
    }

    public static StageScheduler newSourcePartitionedSchedulerAsStageScheduler(SqlStageExecution sqlStageExecution, PlanNodeId planNodeId, SplitSource splitSource, SplitPlacementPolicy splitPlacementPolicy, int i) {
        SourcePartitionedScheduler sourcePartitionedScheduler = new SourcePartitionedScheduler(sqlStageExecution, planNodeId, splitSource, splitPlacementPolicy, i, false);
        sourcePartitionedScheduler.startLifespan(Lifespan.taskWide(), NotPartitionedPartitionHandle.NOT_PARTITIONED);
        return new StageScheduler() { // from class: com.facebook.presto.execution.scheduler.SourcePartitionedScheduler.1
            @Override // com.facebook.presto.execution.scheduler.StageScheduler
            public ScheduleResult schedule() {
                ScheduleResult schedule = SourcePartitionedScheduler.this.schedule();
                SourcePartitionedScheduler.this.drainCompletelyScheduledLifespans();
                return schedule;
            }

            @Override // com.facebook.presto.execution.scheduler.StageScheduler, java.io.Closeable, java.lang.AutoCloseable
            public void close() {
                SourcePartitionedScheduler.this.close();
            }
        };
    }

    public static SourceScheduler newSourcePartitionedSchedulerAsSourceScheduler(SqlStageExecution sqlStageExecution, PlanNodeId planNodeId, SplitSource splitSource, SplitPlacementPolicy splitPlacementPolicy, int i, boolean z) {
        return new SourcePartitionedScheduler(sqlStageExecution, planNodeId, splitSource, splitPlacementPolicy, i, z);
    }

    @Override // com.facebook.presto.execution.scheduler.SourceScheduler
    public synchronized void startLifespan(Lifespan lifespan, ConnectorPartitionHandle connectorPartitionHandle) {
        Preconditions.checkState(this.state == State.INITIALIZED || this.state == State.SPLITS_ADDED);
        this.lifespanAdded = true;
        this.scheduleGroups.put(lifespan, new ScheduleGroup(connectorPartitionHandle));
        this.whenFinishedOrNewLifespanAdded.set(null);
        this.whenFinishedOrNewLifespanAdded = SettableFuture.create();
    }

    @Override // com.facebook.presto.execution.scheduler.SourceScheduler
    public synchronized void rewindLifespan(Lifespan lifespan, ConnectorPartitionHandle connectorPartitionHandle) {
        Preconditions.checkState(this.state == State.INITIALIZED || this.state == State.SPLITS_ADDED, "Current state %s is not rewindable", this.state);
        Preconditions.checkState(this.lifespanAdded, "Cannot rewind lifespan without any lifespan added before");
        this.scheduleGroups.remove(lifespan);
        this.splitSource.rewind(connectorPartitionHandle);
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:111:0x02d3. Please report as an issue. */
    @Override // com.facebook.presto.execution.scheduler.SourceScheduler
    public synchronized ScheduleResult schedule() {
        dropListenersFromWhenFinishedOrNewLifespansAdded();
        int i = 0;
        ImmutableSet.Builder builder = ImmutableSet.builder();
        ArrayList arrayList = new ArrayList();
        boolean z = false;
        boolean z2 = false;
        boolean z3 = false;
        for (Map.Entry<Lifespan, ScheduleGroup> entry : this.scheduleGroups.entrySet()) {
            Lifespan key = entry.getKey();
            ScheduleGroup value = entry.getValue();
            if (value.state == ScheduleGroupState.NO_MORE_SPLITS || value.state == ScheduleGroupState.DONE) {
                Verify.verify(value.nextSplitBatchFuture == null);
            } else if (value.pendingSplits.isEmpty()) {
                if (value.nextSplitBatchFuture == null) {
                    value.nextSplitBatchFuture = this.splitSource.getNextBatch(value.partitionHandle, key, this.splitBatchSize);
                    long nanoTime = System.nanoTime();
                    MoreFutures.addSuccessCallback(value.nextSplitBatchFuture, () -> {
                        this.stage.recordGetSplitTime(nanoTime);
                    });
                }
                if (value.nextSplitBatchFuture.isDone()) {
                    SplitSource.SplitBatch splitBatch = (SplitSource.SplitBatch) MoreFutures.getFutureValue(value.nextSplitBatchFuture);
                    value.nextSplitBatchFuture = null;
                    value.pendingSplits = new HashSet(splitBatch.getSplits());
                    if (splitBatch.isLastBatch()) {
                        if (value.state == ScheduleGroupState.INITIALIZED && value.pendingSplits.isEmpty()) {
                            value.pendingSplits.add(new Split(this.splitSource.getConnectorId(), this.splitSource.getTransactionHandle(), new EmptySplit(this.splitSource.getConnectorId()), key));
                        }
                        value.state = ScheduleGroupState.NO_MORE_SPLITS;
                    }
                } else {
                    arrayList.add(value.nextSplitBatchFuture);
                    z2 = true;
                }
            }
            Multimap<InternalNode, Split> of = ImmutableMultimap.of();
            if (!value.pendingSplits.isEmpty()) {
                if (value.placementFuture.isDone()) {
                    if (value.state == ScheduleGroupState.INITIALIZED) {
                        value.state = ScheduleGroupState.SPLITS_ADDED;
                    }
                    if (this.state == State.INITIALIZED) {
                        this.state = State.SPLITS_ADDED;
                    }
                    SplitPlacementResult computeAssignments = this.splitPlacementPolicy.computeAssignments(value.pendingSplits);
                    of = computeAssignments.getAssignments();
                    Collection<Split> values = of.values();
                    Set<Split> set = value.pendingSplits;
                    set.getClass();
                    values.forEach((v1) -> {
                        r1.remove(v1);
                    });
                    i += of.size();
                    if (!value.pendingSplits.isEmpty()) {
                        value.placementFuture = computeAssignments.getBlocked();
                        arrayList.add(value.placementFuture);
                        z = true;
                    }
                } else {
                    z = true;
                }
            }
            ImmutableMultimap of2 = ImmutableMultimap.of();
            if (value.pendingSplits.isEmpty() && value.state == ScheduleGroupState.NO_MORE_SPLITS) {
                value.state = ScheduleGroupState.DONE;
                if (!key.isTaskWide()) {
                    of2 = ImmutableMultimap.of(((FixedSourcePartitionedScheduler.BucketedSplitPlacementPolicy) this.splitPlacementPolicy).getNodeForBucket(key.getId()), key);
                }
            }
            builder.addAll((Iterable) assignSplits(of, of2));
            if (value.nextSplitBatchFuture == null && value.pendingSplits.isEmpty() && value.state != ScheduleGroupState.DONE) {
                z3 = true;
            }
        }
        if (this.state != State.NO_MORE_SPLITS && this.state != State.FINISHED && (this.groupedExecution || !this.lifespanAdded || !this.scheduleGroups.isEmpty() || !this.splitSource.isFinished())) {
            if (z3) {
                return ScheduleResult.nonBlocked(false, builder.build(), i);
            }
            if (z) {
                builder.addAll((Iterable) finalizeTaskCreationIfNecessary());
            }
            ScheduleResult.BlockedReason blockedReason = z2 ? z ? ScheduleResult.BlockedReason.MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE : ScheduleResult.BlockedReason.WAITING_FOR_SOURCE : z ? ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL : ScheduleResult.BlockedReason.NO_ACTIVE_DRIVER_GROUP;
            arrayList.add(this.whenFinishedOrNewLifespanAdded);
            return ScheduleResult.blocked(false, builder.build(), Futures.nonCancellationPropagating(MoreFutures.whenAnyComplete(arrayList)), blockedReason, i);
        }
        switch (this.state) {
            case INITIALIZED:
                throw new IllegalStateException("At least 1 split should have been scheduled for this plan node");
            case SPLITS_ADDED:
                this.state = State.NO_MORE_SPLITS;
                this.splitSource.close();
            case NO_MORE_SPLITS:
                this.state = State.FINISHED;
                this.whenFinishedOrNewLifespanAdded.set(null);
            case FINISHED:
                return ScheduleResult.nonBlocked(true, builder.build(), i);
            default:
                throw new IllegalStateException("Unknown state");
        }
    }

    private synchronized void dropListenersFromWhenFinishedOrNewLifespansAdded() {
        if (this.whenFinishedOrNewLifespanAdded.isDone()) {
            return;
        }
        this.whenFinishedOrNewLifespanAdded.cancel(true);
        this.whenFinishedOrNewLifespanAdded = SettableFuture.create();
    }

    @Override // com.facebook.presto.execution.scheduler.SourceScheduler
    public void close() {
        this.splitSource.close();
    }

    @Override // com.facebook.presto.execution.scheduler.SourceScheduler
    public synchronized List<Lifespan> drainCompletelyScheduledLifespans() {
        if (this.scheduleGroups.isEmpty()) {
            return ImmutableList.of();
        }
        ImmutableList.Builder builder = ImmutableList.builder();
        Iterator<Map.Entry<Lifespan, ScheduleGroup>> it2 = this.scheduleGroups.entrySet().iterator();
        while (it2.hasNext()) {
            Map.Entry<Lifespan, ScheduleGroup> next = it2.next();
            if (next.getValue().state == ScheduleGroupState.DONE) {
                builder.add((ImmutableList.Builder) next.getKey());
                it2.remove();
            }
        }
        if (this.scheduleGroups.isEmpty() && this.splitSource.isFinished()) {
            this.whenFinishedOrNewLifespanAdded.set(null);
            this.whenFinishedOrNewLifespanAdded = SettableFuture.create();
        }
        return builder.build();
    }

    @Override // com.facebook.presto.execution.scheduler.SourceScheduler
    public synchronized void notifyAllLifespansFinishedExecution() {
        Preconditions.checkState(this.groupedExecution);
        this.state = State.FINISHED;
        this.splitSource.close();
        this.whenFinishedOrNewLifespanAdded.set(null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Set<RemoteTask> assignSplits(Multimap<InternalNode, Split> multimap, Multimap<InternalNode, Lifespan> multimap2) {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        UnmodifiableIterator it2 = ImmutableSet.builder().addAll((Iterable) multimap.keySet()).addAll((Iterable) multimap2.keySet()).build().iterator();
        while (it2.hasNext()) {
            InternalNode internalNode = (InternalNode) it2.next();
            ImmutableMultimap build = ImmutableMultimap.builder().putAll((ImmutableMultimap.Builder) this.partitionedNode, (Iterable) multimap.get(internalNode)).build();
            ImmutableMultimap.Builder builder2 = ImmutableMultimap.builder();
            if (multimap2.containsKey(internalNode)) {
                builder2.putAll((ImmutableMultimap.Builder) this.partitionedNode, (Iterable) multimap2.get(internalNode));
            }
            builder.addAll((Iterable) this.stage.scheduleSplits(internalNode, build, builder2.build()));
        }
        return builder.build();
    }

    private Set<RemoteTask> finalizeTaskCreationIfNecessary() {
        if (this.stage.getFragment().isLeaf()) {
            return ImmutableSet.of();
        }
        this.splitPlacementPolicy.lockDownNodes();
        Set<InternalNode> scheduledNodes = this.stage.getScheduledNodes();
        Set<RemoteTask> set = (Set) this.splitPlacementPolicy.allNodes().stream().filter(internalNode -> {
            return !scheduledNodes.contains(internalNode);
        }).flatMap(internalNode2 -> {
            return this.stage.scheduleSplits(internalNode2, ImmutableMultimap.of(), ImmutableMultimap.of()).stream();
        }).collect(ImmutableSet.toImmutableSet());
        this.stage.transitionToFinishedTaskScheduling();
        return set;
    }
}
