/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.processor.strategy;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.processor.strategy.AbstractProcessingStrategy;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.helpers.NOPLogger;
import reactor.core.publisher.BlockingSink;
import reactor.core.publisher.DirectProcessor;

public class SynchronousProcessingStrategyFactory
implements ProcessingStrategyFactory {
    public static final ProcessingStrategy SYNCHRONOUS_PROCESSING_STRATEGY_INSTANCE = new AbstractProcessingStrategy(){

        @Override
        public boolean isSynchronous() {
            return true;
        }

        @Override
        public Sink createSink(FlowConstruct flowConstruct, Function<Publisher<Event>, Publisher<Event>> function) {
            return new PerThreadSink(() -> new DirectSink(function, event -> {}));
        }
    };

    @Override
    public ProcessingStrategy create(MuleContext muleContext, String schedulersNamePrefix) {
        return SYNCHRONOUS_PROCESSING_STRATEGY_INSTANCE;
    }

    static class DirectSink
    implements Sink,
    Disposable {
        private AbstractProcessingStrategy.ReactorSink reactorSink;

        public DirectSink(Function<Publisher<Event>, Publisher<Event>> function, Consumer<Event> eventConsumer) {
            DirectProcessor directProcessor = DirectProcessor.create();
            BlockingSink blockingSink = directProcessor.serialize().connectSink();
            this.reactorSink = new AbstractProcessingStrategy.ReactorSink(blockingSink, directProcessor.transform(function).retry().subscribe(), eventConsumer);
        }

        @Override
        public void accept(Event event) {
            this.reactorSink.accept(event);
        }

        public void dispose() {
            this.reactorSink.dispose();
        }
    }

    static class PerThreadSink
    implements Sink,
    Disposable {
        private Supplier<Sink> sinkSupplier;
        private Cache<Thread, Sink> sinkCache = CacheBuilder.newBuilder().weakValues().removalListener(notification -> LifecycleUtils.disposeIfNeeded(notification.getValue(), (Logger)NOPLogger.NOP_LOGGER)).build();

        public PerThreadSink(Supplier<Sink> sinkSupplier) {
            this.sinkSupplier = sinkSupplier;
        }

        @Override
        public void accept(Event event) {
            try {
                ((Sink)this.sinkCache.get((Object)Thread.currentThread(), () -> this.sinkSupplier.get())).accept(event);
            }
            catch (ExecutionException e) {
                throw new IllegalStateException("Unable to create Sink for Thread " + Thread.currentThread(), e);
            }
        }

        public void dispose() {
            LifecycleUtils.disposeIfNeeded(this.sinkCache.asMap().entrySet(), (Logger)NOPLogger.NOP_LOGGER);
            this.sinkCache.invalidateAll();
        }
    }
}

