package com.facebook.presto.execution.scheduler;

import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.execution.ForQueryExecution;
import com.facebook.presto.execution.NodeTaskMap;
import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.execution.RemoteTaskFactory;
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.execution.StageExecutionId;
import com.facebook.presto.execution.StageExecutionState;
import com.facebook.presto.execution.StageId;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.scheduler.nodeSelection.NodeSelector;
import com.facebook.presto.failureDetector.FailureDetector;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.operator.ForScheduler;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.connector.NotPartitionedPartitionHandle;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.split.SplitSource;
import com.facebook.presto.sql.planner.NodePartitionMap;
import com.facebook.presto.sql.planner.NodePartitioningManager;
import com.facebook.presto.sql.planner.PartitioningHandle;
import com.facebook.presto.sql.planner.SplitSourceFactory;
import com.facebook.presto.sql.planner.SystemPartitioningHandle;
import com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.facebook.presto.util.Failures;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.inject.Inject;

/* loaded from: input_file:com/facebook/presto/execution/scheduler/SectionExecutionFactory.class */
public class SectionExecutionFactory {
    private final Metadata metadata;
    private final NodePartitioningManager nodePartitioningManager;
    private final NodeTaskMap nodeTaskMap;
    private final ExecutorService executor;
    private final ScheduledExecutorService scheduledExecutor;
    private final FailureDetector failureDetector;
    private final SplitSchedulerStats schedulerStats;
    private final NodeScheduler nodeScheduler;
    private final int splitBatchSize;

    @Inject
    public SectionExecutionFactory(Metadata metadata, NodePartitioningManager nodePartitioningManager, NodeTaskMap nodeTaskMap, @ForQueryExecution ExecutorService executorService, @ForScheduler ScheduledExecutorService scheduledExecutorService, FailureDetector failureDetector, SplitSchedulerStats splitSchedulerStats, NodeScheduler nodeScheduler, QueryManagerConfig queryManagerConfig) {
        this(metadata, nodePartitioningManager, nodeTaskMap, executorService, scheduledExecutorService, failureDetector, splitSchedulerStats, nodeScheduler, ((QueryManagerConfig) Objects.requireNonNull(queryManagerConfig, "queryManagerConfig is null")).getScheduleSplitBatchSize());
    }

    public SectionExecutionFactory(Metadata metadata, NodePartitioningManager nodePartitioningManager, NodeTaskMap nodeTaskMap, ExecutorService executorService, ScheduledExecutorService scheduledExecutorService, FailureDetector failureDetector, SplitSchedulerStats splitSchedulerStats, NodeScheduler nodeScheduler, int i) {
        this.metadata = (Metadata) Objects.requireNonNull(metadata, "metadata is null");
        this.nodePartitioningManager = (NodePartitioningManager) Objects.requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
        this.nodeTaskMap = (NodeTaskMap) Objects.requireNonNull(nodeTaskMap, "nodeTaskMap is null");
        this.executor = (ExecutorService) Objects.requireNonNull(executorService, "executor is null");
        this.scheduledExecutor = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService, "scheduledExecutor is null");
        this.failureDetector = (FailureDetector) Objects.requireNonNull(failureDetector, "failureDetector is null");
        this.schedulerStats = (SplitSchedulerStats) Objects.requireNonNull(splitSchedulerStats, "schedulerStats is null");
        this.nodeScheduler = (NodeScheduler) Objects.requireNonNull(nodeScheduler, "nodeScheduler is null");
        this.splitBatchSize = i;
    }

    public SectionExecution createSectionExecutions(Session session, StreamingPlanSection streamingPlanSection, ExchangeLocationsConsumer exchangeLocationsConsumer, Optional<int[]> optional, OutputBuffers outputBuffers, boolean z, RemoteTaskFactory remoteTaskFactory, SplitSourceFactory splitSourceFactory, int i) {
        HashMap hashMap = new HashMap();
        List<StageExecutionAndScheduler> createStreamingLinkedStageExecutions = createStreamingLinkedStageExecutions(session, exchangeLocationsConsumer, streamingPlanSection.getPlan().withBucketToPartition(optional), partitioningHandle -> {
            return (NodePartitionMap) hashMap.computeIfAbsent(partitioningHandle, partitioningHandle -> {
                return this.nodePartitioningManager.getNodePartitioningMap(session, partitioningHandle);
            });
        }, TableWriteInfo.createTableWriteInfo(streamingPlanSection.getPlan(), this.metadata, session), Optional.empty(), z, remoteTaskFactory, splitSourceFactory, i);
        StageExecutionAndScheduler stageExecutionAndScheduler = (StageExecutionAndScheduler) Iterables.getLast(createStreamingLinkedStageExecutions);
        stageExecutionAndScheduler.getStageExecution().setOutputBuffers(outputBuffers);
        return new SectionExecution(stageExecutionAndScheduler, createStreamingLinkedStageExecutions);
    }

    private List<StageExecutionAndScheduler> createStreamingLinkedStageExecutions(Session session, ExchangeLocationsConsumer exchangeLocationsConsumer, StreamingSubPlan streamingSubPlan, Function<PartitioningHandle, NodePartitionMap> function, TableWriteInfo tableWriteInfo, Optional<SqlStageExecution> optional, boolean z, RemoteTaskFactory remoteTaskFactory, SplitSourceFactory splitSourceFactory, int i) {
        ImmutableList.Builder builder = ImmutableList.builder();
        PlanFragmentId id = streamingSubPlan.getFragment().getId();
        StageId stageId = new StageId(session.getQueryId(), id.getId());
        SqlStageExecution createSqlStageExecution = SqlStageExecution.createSqlStageExecution(new StageExecutionId(stageId, i), streamingSubPlan.getFragment(), remoteTaskFactory, session, z, this.nodeTaskMap, this.executor, this.failureDetector, this.schedulerStats, tableWriteInfo);
        PartitioningHandle partitioning = streamingSubPlan.getFragment().getPartitioning();
        Optional<int[]> bucketToPartition = getBucketToPartition(partitioning, function, streamingSubPlan.getFragment().getRoot(), streamingSubPlan.getFragment().getRemoteSourceNodes());
        ImmutableSet.Builder builder2 = ImmutableSet.builder();
        for (StreamingSubPlan streamingSubPlan2 : streamingSubPlan.getChildren()) {
            createSqlStageExecution.getClass();
            List<StageExecutionAndScheduler> createStreamingLinkedStageExecutions = createStreamingLinkedStageExecutions(session, createSqlStageExecution::addExchangeLocations, streamingSubPlan2.withBucketToPartition(bucketToPartition), function, tableWriteInfo, Optional.of(createSqlStageExecution), z, remoteTaskFactory, splitSourceFactory, i);
            builder.addAll((Iterable) createStreamingLinkedStageExecutions);
            builder2.add((ImmutableSet.Builder) ((StageExecutionAndScheduler) Iterables.getLast(createStreamingLinkedStageExecutions)).getStageExecution());
        }
        ImmutableSet build = builder2.build();
        createSqlStageExecution.addStateChangeListener(stageExecutionState -> {
            if (stageExecutionState.isDone()) {
                build.forEach((v0) -> {
                    v0.cancel();
                });
            }
        });
        builder.add((ImmutableList.Builder) new StageExecutionAndScheduler(createSqlStageExecution, new StageLinkage(id, exchangeLocationsConsumer, build), createStageScheduler(splitSourceFactory, session, streamingSubPlan, function, optional, stageId, createSqlStageExecution, partitioning, tableWriteInfo, build)));
        return builder.build();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private StageScheduler createStageScheduler(SplitSourceFactory splitSourceFactory, Session session, StreamingSubPlan streamingSubPlan, Function<PartitioningHandle, NodePartitionMap> function, Optional<SqlStageExecution> optional, StageId stageId, SqlStageExecution sqlStageExecution, PartitioningHandle partitioningHandle, TableWriteInfo tableWriteInfo, Set<SqlStageExecution> set) {
        List of;
        List<InternalNode> partitionToNode;
        BucketNodeMap asBucketNodeMap;
        Map<PlanNodeId, SplitSource> createSplitSources = splitSourceFactory.createSplitSources(streamingSubPlan.getFragment(), session, tableWriteInfo);
        int maxTasksPerStage = SystemSessionProperties.getMaxTasksPerStage(session);
        if (partitioningHandle.equals(SystemPartitioningHandle.SOURCE_DISTRIBUTION)) {
            Map.Entry entry = (Map.Entry) Iterables.getOnlyElement(createSplitSources.entrySet());
            PlanNodeId planNodeId = (PlanNodeId) entry.getKey();
            SplitSource splitSource = (SplitSource) entry.getValue();
            ConnectorId connectorId = splitSource.getConnectorId();
            if (ConnectorId.isInternalSystemConnector(connectorId)) {
                connectorId = null;
            }
            NodeSelector createNodeSelector = this.nodeScheduler.createNodeSelector(connectorId, maxTasksPerStage);
            sqlStageExecution.getClass();
            DynamicSplitPlacementPolicy dynamicSplitPlacementPolicy = new DynamicSplitPlacementPolicy(createNodeSelector, sqlStageExecution::getAllTasks);
            Preconditions.checkArgument(!streamingSubPlan.getFragment().getStageExecutionDescriptor().isStageGroupedExecution());
            return SourcePartitionedScheduler.newSourcePartitionedSchedulerAsStageScheduler(sqlStageExecution, planNodeId, splitSource, dynamicSplitPlacementPolicy, this.splitBatchSize);
        }
        if (partitioningHandle.equals(SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION)) {
            ScaledWriterScheduler scaledWriterScheduler = new ScaledWriterScheduler(sqlStageExecution, () -> {
                return (List) set.stream().map((v0) -> {
                    return v0.getAllTasks();
                }).flatMap((v0) -> {
                    return v0.stream();
                }).map((v0) -> {
                    return v0.getTaskStatus();
                }).collect(Collectors.toList());
            }, () -> {
                return (List) sqlStageExecution.getAllTasks().stream().map((v0) -> {
                    return v0.getTaskStatus();
                }).collect(Collectors.toList());
            }, this.nodeScheduler.createNodeSelector(null), this.scheduledExecutor, SystemSessionProperties.getWriterMinSize(session), SystemSessionProperties.isOptimizedScaleWriterProducerBuffer(session));
            ListenableFuture<?> whenAllStages = whenAllStages(set, (v0) -> {
                return v0.isDone();
            });
            scaledWriterScheduler.getClass();
            whenAllStages.addListener(scaledWriterScheduler::finish, MoreExecutors.directExecutor());
            return scaledWriterScheduler;
        }
        if (createSplitSources.isEmpty()) {
            List<InternalNode> partitionToNode2 = function.apply(streamingSubPlan.getFragment().getPartitioning()).getPartitionToNode();
            Failures.checkCondition(!partitionToNode2.isEmpty(), StandardErrorCode.NO_NODES_AVAILABLE, "No worker nodes available", new Object[0]);
            return new FixedCountScheduler(sqlStageExecution, partitionToNode2);
        }
        List<PlanNodeId> tableScanSchedulingOrder = streamingSubPlan.getFragment().getTableScanSchedulingOrder();
        ConnectorId orElseThrow = partitioningHandle.getConnectorId().orElseThrow(IllegalStateException::new);
        boolean isStageGroupedExecution = streamingSubPlan.getFragment().getStageExecutionDescriptor().isStageGroupedExecution();
        if (isStageGroupedExecution) {
            of = this.nodePartitioningManager.listPartitionHandles(session, partitioningHandle);
            Preconditions.checkState(!ImmutableList.of(NotPartitionedPartitionHandle.NOT_PARTITIONED).equals(of));
        } else {
            of = ImmutableList.of(NotPartitionedPartitionHandle.NOT_PARTITIONED);
        }
        if (streamingSubPlan.getFragment().getRemoteSourceNodes().stream().allMatch(remoteSourceNode -> {
            return remoteSourceNode.getExchangeType() == ExchangeNode.Type.REPLICATE;
        })) {
            boolean isDynamicLifespanSchedule = streamingSubPlan.getFragment().getStageExecutionDescriptor().isDynamicLifespanSchedule();
            asBucketNodeMap = this.nodePartitioningManager.getBucketNodeMap(session, partitioningHandle, isDynamicLifespanSchedule);
            Verify.verify(asBucketNodeMap.isDynamic() == isDynamicLifespanSchedule);
            partitionToNode = asBucketNodeMap.hasInitialMap() ? (List) asBucketNodeMap.getBucketToNode().get().stream().distinct().collect(ImmutableList.toImmutableList()) : new ArrayList(this.nodeScheduler.createNodeSelector(orElseThrow).selectRandomNodes(maxTasksPerStage));
        } else {
            Verify.verify(!streamingSubPlan.getFragment().getStageExecutionDescriptor().isDynamicLifespanSchedule());
            NodePartitionMap apply = function.apply(streamingSubPlan.getFragment().getPartitioning());
            if (isStageGroupedExecution) {
                Preconditions.checkState(of.size() == apply.getBucketToPartition().length);
            }
            partitionToNode = apply.getPartitionToNode();
            asBucketNodeMap = apply.asBucketNodeMap();
        }
        FixedSourcePartitionedScheduler fixedSourcePartitionedScheduler = new FixedSourcePartitionedScheduler(sqlStageExecution, createSplitSources, streamingSubPlan.getFragment().getStageExecutionDescriptor(), tableScanSchedulingOrder, partitionToNode, asBucketNodeMap, this.splitBatchSize, SystemSessionProperties.getConcurrentLifespansPerNode(session), this.nodeScheduler.createNodeSelector(orElseThrow), of);
        if (streamingSubPlan.getFragment().getStageExecutionDescriptor().isRecoverableGroupedExecution()) {
            sqlStageExecution.registerStageTaskRecoveryCallback(taskId -> {
                Preconditions.checkArgument(taskId.getStageExecutionId().getStageId().equals(stageId), "The task did not execute this stage");
                Preconditions.checkArgument(optional.isPresent(), "Parent stage execution must exist");
                Preconditions.checkArgument(((SqlStageExecution) optional.get()).getAllTasks().size() == 1, "Parent stage should only have one task for recoverable grouped execution");
                ((SqlStageExecution) optional.get()).removeRemoteSourceIfSingleTaskStage(taskId);
                fixedSourcePartitionedScheduler.recover(taskId);
            });
        }
        return fixedSourcePartitionedScheduler;
    }

    private static Optional<int[]> getBucketToPartition(PartitioningHandle partitioningHandle, Function<PartitioningHandle, NodePartitionMap> function, PlanNode planNode, List<RemoteSourceNode> list) {
        if (partitioningHandle.equals(SystemPartitioningHandle.SOURCE_DISTRIBUTION) || partitioningHandle.equals(SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION)) {
            return Optional.of(new int[1]);
        }
        if (PlanNodeSearcher.searchFrom(planNode).where(planNode2 -> {
            return planNode2 instanceof TableScanNode;
        }).findFirst().isPresent()) {
            return list.stream().allMatch(remoteSourceNode -> {
                return remoteSourceNode.getExchangeType() == ExchangeNode.Type.REPLICATE;
            }) ? Optional.empty() : Optional.of(function.apply(partitioningHandle).getBucketToPartition());
        }
        NodePartitionMap apply = function.apply(partitioningHandle);
        Failures.checkCondition(!apply.getPartitionToNode().isEmpty(), StandardErrorCode.NO_NODES_AVAILABLE, "No worker nodes available", new Object[0]);
        return Optional.of(apply.getBucketToPartition());
    }

    private static ListenableFuture<?> whenAllStages(Collection<SqlStageExecution> collection, Predicate<StageExecutionState> predicate) {
        Preconditions.checkArgument(!collection.isEmpty(), "stageExecutions is empty");
        Set newConcurrentHashSet = Sets.newConcurrentHashSet((Iterable) collection.stream().map((v0) -> {
            return v0.getStageExecutionId();
        }).collect(Collectors.toSet()));
        SettableFuture create = SettableFuture.create();
        for (SqlStageExecution sqlStageExecution : collection) {
            sqlStageExecution.addStateChangeListener(stageExecutionState -> {
                if (predicate.test(stageExecutionState) && newConcurrentHashSet.remove(sqlStageExecution.getStageExecutionId()) && newConcurrentHashSet.isEmpty()) {
                    create.set(null);
                }
            });
        }
        return create;
    }
}
