package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.time.temporal.Temporal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.scheduler.adaptive.StateTransitionManager;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.class */
public class DefaultStateTransitionManager implements StateTransitionManager {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultStateTransitionManager.class);
    private final Supplier<Temporal> clock;
    private final StateTransitionManager.Context transitionContext;
    private Phase phase;
    private final List<ScheduledFuture<?>> scheduledFutures;
    private final Duration resourceStabilizationTimeout;
    private final Duration maxTriggerDelay;

    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager$Cooldown.class */
    static final class Cooldown extends Phase {

        @Nullable
        private Temporal firstChangeEventTimestamp;

        private Cooldown(Temporal temporal, Supplier<Temporal> supplier, DefaultStateTransitionManager defaultStateTransitionManager, Duration duration) {
            super(supplier, defaultStateTransitionManager);
            scheduleRelativelyTo(this::finalizeCooldown, temporal, duration);
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Phase
        void onChange() {
            if (hasSufficientResources() && this.firstChangeEventTimestamp == null) {
                this.firstChangeEventTimestamp = now();
            }
        }

        private void finalizeCooldown() {
            if (this.firstChangeEventTimestamp == null) {
                context().progressToIdling();
            } else {
                context().progressToStabilizing(this.firstChangeEventTimestamp);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager$Idling.class */
    public static final class Idling extends Phase {
        private Idling(Supplier<Temporal> supplier, DefaultStateTransitionManager defaultStateTransitionManager) {
            super(supplier, defaultStateTransitionManager);
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Phase
        void onChange() {
            if (hasSufficientResources()) {
                context().progressToStabilizing(now());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager$Phase.class */
    public static abstract class Phase {
        private final Supplier<Temporal> clock;
        private final DefaultStateTransitionManager context;

        @VisibleForTesting
        Phase(Supplier<Temporal> supplier, DefaultStateTransitionManager defaultStateTransitionManager) {
            this.clock = supplier;
            this.context = defaultStateTransitionManager;
        }

        Temporal now() {
            return this.clock.get();
        }

        DefaultStateTransitionManager context() {
            return this.context;
        }

        void scheduleRelativelyTo(Runnable runnable, Temporal temporal, Duration duration) {
            Duration between = Duration.between(temporal, now());
            Preconditions.checkArgument(!between.isNegative(), "The startOfTimeout ({}) should be in the past but is after the current time.", new Object[]{temporal});
            Duration minus = duration.minus(between);
            scheduleFromNow(runnable, minus.isNegative() ? Duration.ZERO : minus);
        }

        void scheduleFromNow(Runnable runnable, Duration duration) {
            this.context.scheduleFromNow(runnable, duration, this);
        }

        boolean hasDesiredResources() {
            return this.context.transitionContext.hasDesiredResources();
        }

        boolean hasSufficientResources() {
            return this.context.transitionContext.hasSufficientResources();
        }

        void onChange() {
        }

        void onTrigger() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager$Stabilized.class */
    public static final class Stabilized extends Phase {
        private Stabilized(Supplier<Temporal> supplier, DefaultStateTransitionManager defaultStateTransitionManager, Temporal temporal, Duration duration) {
            super(supplier, defaultStateTransitionManager);
            scheduleRelativelyTo(this::onTrigger, temporal, duration);
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Phase
        void onTrigger() {
            if (hasSufficientResources()) {
                context().triggerTransitionToSubsequentState();
            } else {
                DefaultStateTransitionManager.LOG.debug("Sufficient resources are not met, progressing to idling.");
                context().progressToIdling();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager$Stabilizing.class */
    public static final class Stabilizing extends Phase {
        private Temporal onChangeEventTimestamp;
        private final Duration maxTriggerDelay;
        private boolean evaluationScheduled;

        private Stabilizing(Supplier<Temporal> supplier, DefaultStateTransitionManager defaultStateTransitionManager, Duration duration, Temporal temporal, Duration duration2) {
            super(supplier, defaultStateTransitionManager);
            this.evaluationScheduled = false;
            this.onChangeEventTimestamp = temporal;
            this.maxTriggerDelay = duration2;
            scheduleRelativelyTo(() -> {
                context().progressToStabilized(temporal);
            }, temporal, duration);
            scheduleTransitionEvaluation();
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Phase
        void onChange() {
            this.onChangeEventTimestamp = now();
            scheduleTransitionEvaluation();
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Phase
        void onTrigger() {
            transitionToSubSequentStateForDesiredResources();
        }

        private void scheduleTransitionEvaluation() {
            if (this.evaluationScheduled) {
                return;
            }
            this.evaluationScheduled = true;
            scheduleRelativelyTo(() -> {
                this.evaluationScheduled = false;
                transitionToSubSequentStateForDesiredResources();
            }, this.onChangeEventTimestamp, this.maxTriggerDelay);
        }

        private void transitionToSubSequentStateForDesiredResources() {
            if (hasDesiredResources()) {
                context().triggerTransitionToSubsequentState();
            } else {
                DefaultStateTransitionManager.LOG.debug("Desired resources are not met, skipping the transition to the subsequent state.");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager$Transitioning.class */
    public static final class Transitioning extends Phase {
        private Transitioning(Supplier<Temporal> supplier, DefaultStateTransitionManager defaultStateTransitionManager) {
            super(supplier, defaultStateTransitionManager);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultStateTransitionManager(StateTransitionManager.Context context, Supplier<Temporal> supplier, Duration duration, Duration duration2, Duration duration3) {
        this.clock = (Supplier) Preconditions.checkNotNull(supplier);
        Preconditions.checkArgument(!duration3.isNegative(), "Max trigger delay must not be negative");
        this.maxTriggerDelay = duration3;
        this.resourceStabilizationTimeout = (Duration) Preconditions.checkNotNull(duration2);
        Preconditions.checkArgument(!duration2.isNegative(), "Resource stabilization timeout must not be negative");
        this.transitionContext = (StateTransitionManager.Context) Preconditions.checkNotNull(context);
        this.scheduledFutures = new ArrayList();
        this.phase = new Cooldown((Temporal) Preconditions.checkNotNull(supplier.get()), supplier, this, (Duration) Preconditions.checkNotNull(duration));
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateTransitionManager
    public void onChange() {
        LOG.debug("OnChange event received in phase {}.", getPhase());
        this.phase.onChange();
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateTransitionManager
    public void onTrigger() {
        LOG.debug("OnTrigger event received in phase {}.", getPhase());
        this.phase.onTrigger();
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateTransitionManager
    public void close() {
        this.scheduledFutures.forEach(scheduledFuture -> {
            scheduledFuture.cancel(true);
        });
        this.scheduledFutures.clear();
    }

    @VisibleForTesting
    Phase getPhase() {
        return this.phase;
    }

    private void progressToIdling() {
        progressToPhase(new Idling(this.clock, this));
    }

    private void progressToStabilizing(Temporal temporal) {
        progressToPhase(new Stabilizing(this.clock, this, this.resourceStabilizationTimeout, temporal, this.maxTriggerDelay));
    }

    private void progressToStabilized(Temporal temporal) {
        progressToPhase(new Stabilized(this.clock, this, temporal, this.maxTriggerDelay));
    }

    private void triggerTransitionToSubsequentState() {
        progressToPhase(new Transitioning(this.clock, this));
        this.transitionContext.transitionToSubsequentState();
    }

    private void progressToPhase(Phase phase) {
        Preconditions.checkState(!(this.phase instanceof Transitioning), "The state transition operation has already been triggered.");
        LOG.debug("Transitioning from {} to {}.", this.phase, phase);
        this.phase = phase;
    }

    @VisibleForTesting
    void scheduleFromNow(Runnable runnable, Duration duration, Phase phase) {
        this.scheduledFutures.add(this.transitionContext.scheduleOperation(() -> {
            runIfPhase(phase, runnable);
        }, duration));
    }

    private void runIfPhase(Phase phase, Runnable runnable) {
        if (getPhase() == phase) {
            runnable.run();
        } else {
            LOG.debug("Ignoring scheduled action because expected phase {} is not the actual phase {}.", phase, getPhase());
        }
    }
}
