/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.processor.strategy;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.exception.MessagingExceptionHandler;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.scheduler.SchedulerConfig;
import org.mule.runtime.core.processor.strategy.AbstractRingBufferProcessingStrategyFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class WorkQueueProcessingStrategyFactory
extends AbstractRingBufferProcessingStrategyFactory {
    private static int DEFAULT_MAX_CONCURRENCY = 16;
    private int maxConcurrency = DEFAULT_MAX_CONCURRENCY;

    public void setMaxConcurrency(int maxConcurrency) {
        if (maxConcurrency > 1) {
            throw new IllegalArgumentException("maxConcurrency must be at least 1");
        }
        this.maxConcurrency = maxConcurrency;
    }

    @Override
    public ProcessingStrategy create(MuleContext muleContext, String schedulersNamePrefix) {
        return new WorkQueueProcessingStrategy(() -> muleContext.getSchedulerService().ioScheduler(SchedulerConfig.config().withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.BLOCKING.name())), this.maxConcurrency, scheduler -> scheduler.stop((long)muleContext.getConfiguration().getShutdownTimeout(), TimeUnit.MILLISECONDS), () -> muleContext.getSchedulerService().customScheduler(SchedulerConfig.config().withName(schedulersNamePrefix + "." + RING_BUFFER_SCHEDULER_NAME_SUFFIX).withMaxConcurrentTasks(this.getSubscriberCount() + 1)), this.getBufferSize(), this.getSubscriberCount(), this.getWaitStrategy(), muleContext);
    }

    public void setConcurrency(int concurrency) {
        this.maxConcurrency = concurrency;
    }

    static class WorkQueueProcessingStrategy
    extends AbstractRingBufferProcessingStrategyFactory.RingBufferProcessingStrategy
    implements Startable,
    Stoppable {
        private Supplier<Scheduler> ioSchedulerSupplier;
        private Consumer<Scheduler> schedulerStopper;
        private int concurrency;
        private Scheduler ioScheduler;

        public WorkQueueProcessingStrategy(Supplier<Scheduler> ioSchedulerSupplier, int concurrency, Consumer<Scheduler> schedulerStopper, Supplier<Scheduler> ringBufferSchedulerSupplier, int bufferSize, int subscriberCount, String waitStrategy, MuleContext muleContext) {
            super(ringBufferSchedulerSupplier, bufferSize, subscriberCount, waitStrategy, muleContext);
            this.ioSchedulerSupplier = ioSchedulerSupplier;
            this.schedulerStopper = schedulerStopper;
            this.concurrency = concurrency;
        }

        @Override
        public Function<Publisher<Event>, Publisher<Event>> onPipeline(FlowConstruct flowConstruct, Function<Publisher<Event>, Publisher<Event>> pipelineFunction, MessagingExceptionHandler messagingExceptionHandler) {
            return publisher -> Flux.from((Publisher)publisher).flatMap(event -> Flux.just((Object)event).transform(pipelineFunction).subscribeOn(Schedulers.fromExecutorService((ExecutorService)this.ioScheduler)), this.concurrency);
        }

        public void start() throws MuleException {
            this.ioScheduler = this.ioSchedulerSupplier.get();
        }

        public void stop() throws MuleException {
            if (this.ioScheduler != null) {
                this.schedulerStopper.accept(this.ioScheduler);
            }
        }
    }
}

