/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.processor.strategy;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.scheduler.SchedulerConfig;
import org.mule.runtime.core.internal.util.rx.ConditionalExecutorServiceDecorator;
import org.mule.runtime.core.processor.strategy.AbstractRingBufferProcessingStrategyFactory;
import org.mule.runtime.core.processor.strategy.ReactorProcessingStrategyFactory;
import org.mule.runtime.core.processor.strategy.StreamPerEventSink;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class ProactorProcessingStrategyFactory
extends AbstractRingBufferProcessingStrategyFactory {
    private static int DEFAULT_MAX_CONCURRENCY = Integer.MAX_VALUE;
    private int maxConcurrency = DEFAULT_MAX_CONCURRENCY;

    public void setMaxConcurrency(int maxConcurrency) {
        if (maxConcurrency > 1) {
            throw new IllegalArgumentException("maxConcurrency must be at least 1");
        }
        this.maxConcurrency = maxConcurrency;
    }

    protected int getMaxConcurrency() {
        return this.maxConcurrency;
    }

    @Override
    public ProcessingStrategy create(MuleContext muleContext, String schedulersNamePrefix) {
        if (this.maxConcurrency == 1) {
            return new ReactorProcessingStrategyFactory().create(muleContext, schedulersNamePrefix);
        }
        return new ProactorProcessingStrategy(() -> muleContext.getSchedulerService().cpuLightScheduler(SchedulerConfig.config().withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.CPU_LITE.name())), () -> muleContext.getSchedulerService().ioScheduler(SchedulerConfig.config().withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.BLOCKING.name())), () -> muleContext.getSchedulerService().cpuIntensiveScheduler(SchedulerConfig.config().withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.CPU_INTENSIVE.name())), scheduler -> scheduler.stop((long)muleContext.getConfiguration().getShutdownTimeout(), TimeUnit.MILLISECONDS), this.maxConcurrency, () -> muleContext.getSchedulerService().customScheduler(SchedulerConfig.config().withName(schedulersNamePrefix + RING_BUFFER_SCHEDULER_NAME_SUFFIX).withMaxConcurrentTasks(this.getSubscriberCount() + 1)), this.getBufferSize(), this.getSubscriberCount(), this.getWaitStrategy(), muleContext);
    }

    static class ProactorProcessingStrategy
    extends AbstractRingBufferProcessingStrategyFactory.RingBufferProcessingStrategy
    implements Startable,
    Stoppable {
        private Supplier<Scheduler> cpuLightSchedulerSupplier;
        private Supplier<Scheduler> blockingSchedulerSupplier;
        private Supplier<Scheduler> cpuIntensiveSchedulerSupplier;
        private Consumer<Scheduler> schedulerStopper;
        private Scheduler cpuLightScheduler;
        private Scheduler blockingScheduler;
        private Scheduler cpuIntensiveScheduler;
        private int maxConcurrency;

        public ProactorProcessingStrategy(Supplier<Scheduler> cpuLightSchedulerSupplier, Supplier<Scheduler> blockingSchedulerSupplier, Supplier<Scheduler> cpuIntensiveSchedulerSupplier, Consumer<Scheduler> schedulerStopper, int maxConcurrency, Supplier<Scheduler> ringBufferSchedulerSupplier, int bufferSize, int subscriberCount, String waitStrategy, MuleContext muleContext) {
            super(ringBufferSchedulerSupplier, bufferSize, subscriberCount, waitStrategy, muleContext);
            this.cpuLightSchedulerSupplier = cpuLightSchedulerSupplier;
            this.blockingSchedulerSupplier = blockingSchedulerSupplier;
            this.cpuIntensiveSchedulerSupplier = cpuIntensiveSchedulerSupplier;
            this.schedulerStopper = schedulerStopper;
            this.maxConcurrency = maxConcurrency;
        }

        public void start() throws MuleException {
            this.cpuLightScheduler = this.cpuLightSchedulerSupplier.get();
            this.blockingScheduler = this.blockingSchedulerSupplier.get();
            this.cpuIntensiveScheduler = this.cpuIntensiveSchedulerSupplier.get();
        }

        public void stop() throws MuleException {
            if (this.cpuLightScheduler != null) {
                this.schedulerStopper.accept(this.cpuLightScheduler);
            }
            if (this.blockingScheduler != null) {
                this.schedulerStopper.accept(this.blockingScheduler);
            }
            if (this.cpuIntensiveScheduler != null) {
                this.schedulerStopper.accept(this.cpuIntensiveScheduler);
            }
        }

        @Override
        public Function<Publisher<Event>, Publisher<Event>> onPipeline(FlowConstruct flowConstruct, Function<Publisher<Event>, Publisher<Event>> pipelineFunction) {
            return publisher -> Flux.from((Publisher)publisher).publishOn(Schedulers.fromExecutorService((ExecutorService)this.getExecutorService(this.cpuLightScheduler))).transform(pipelineFunction);
        }

        @Override
        public Function<Publisher<Event>, Publisher<Event>> onProcessor(Processor messageProcessor, Function<Publisher<Event>, Publisher<Event>> processorFunction) {
            if (messageProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.BLOCKING) {
                return this.proactor(processorFunction, this.blockingScheduler);
            }
            if (messageProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.CPU_INTENSIVE) {
                return this.proactor(processorFunction, this.cpuIntensiveScheduler);
            }
            return publisher -> Flux.from((Publisher)publisher).transform(processorFunction);
        }

        private Function<Publisher<Event>, Publisher<Event>> proactor(Function<Publisher<Event>, Publisher<Event>> processorFunction, Scheduler scheduler) {
            return publisher -> Flux.from((Publisher)publisher).publishOn(Schedulers.fromExecutorService((ExecutorService)this.getExecutorService(scheduler))).transform(processorFunction).publishOn(Schedulers.fromExecutorService((ExecutorService)this.getExecutorService(this.cpuLightScheduler)));
        }

        protected ExecutorService getExecutorService(Scheduler scheduler) {
            return new ConditionalExecutorServiceDecorator(scheduler, this.scheduleOverridePredicate());
        }

        protected Predicate<Scheduler> scheduleOverridePredicate() {
            return scheduler -> false;
        }

        @Override
        public Sink createSink(FlowConstruct flowConstruct, Function<Publisher<Event>, Publisher<Event>> function) {
            return new StreamPerEventSink(function, this.createOnEventConsumer());
        }
    }
}

