/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.mule.runtime.cache.internal;

import com.mulesoft.mule.runtime.cache.api.CachingStrategy;
import com.mulesoft.mule.runtime.cache.api.key.MuleEventKeyGenerator;
import com.mulesoft.mule.runtime.cache.api.response.MuleEventCopier;
import com.mulesoft.mule.runtime.cache.api.response.ResponseGenerator;
import com.mulesoft.mule.runtime.cache.internal.eventcopier.DefaultMuleEventCopier;
import com.mulesoft.mule.runtime.cache.internal.keygenerator.SHA256MuleEventKeyGenerator;
import com.mulesoft.mule.runtime.cache.internal.responsegenerator.DefaultResponseGenerator;
import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.function.Function;
import javax.inject.Inject;
import org.mule.runtime.api.component.AbstractComponent;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Lifecycle;
import org.mule.runtime.api.lock.LockFactory;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.context.MuleContextAware;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.api.util.StreamingUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public abstract class AbstractCachingStrategy
extends AbstractComponent
implements CachingStrategy,
Lifecycle {
    protected Logger logger = LoggerFactory.getLogger(this.getClass());
    private static final Lock NO_OP_LOCK = new Lock(){

        @Override
        public void unlock() {
        }

        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedException();
            }
            return true;
        }

        @Override
        public boolean tryLock() {
            return true;
        }

        @Override
        public Condition newCondition() {
            throw new UnsupportedOperationException("newCondition() not supported");
        }

        @Override
        public void lockInterruptibly() throws InterruptedException {
            if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedException();
            }
        }

        @Override
        public void lock() {
        }
    };
    private MuleEventKeyGenerator keyGenerator = new SHA256MuleEventKeyGenerator();
    private ResponseGenerator responseGenerator = new DefaultResponseGenerator();
    private String name;
    private MuleEventCopier muleEventCopier = new DefaultMuleEventCopier();
    private boolean synchronizedAccess = true;
    private LockFactory lockFactory;
    @Inject
    private SchedulerService schedulerService;
    protected MuleContext muleContext;
    private Scheduler lockScheduler;

    public void initialise() throws InitialisationException {
        if (this.keyGenerator instanceof MuleContextAware) {
            ((MuleContextAware)this.keyGenerator).setMuleContext(this.muleContext);
        }
    }

    public void start() throws MuleException {
        this.lockScheduler = this.schedulerService.ioScheduler(SchedulerConfig.config().withName("cachingStrategy_" + this.name));
    }

    public void stop() throws MuleException {
        if (this.lockScheduler != null) {
            this.lockScheduler.stop();
        }
    }

    @Override
    public CoreEvent process(CoreEvent event, Processor messageProcessor) throws MuleException {
        try {
            return (CoreEvent)Mono.just((Object)event).transform((Function)this.transformProcessor(messageProcessor)).block();
        }
        catch (Throwable e) {
            throw Exceptions.rxExceptionToMuleException((Throwable)e);
        }
    }

    @Override
    public ReactiveProcessor transformProcessor(Processor processor) {
        return publisher -> Flux.from((Publisher)publisher).flatMap(request -> {
            String key;
            if (request.getMessage().getPayload().getDataType().isStreamType()) {
                this.logger.warn("Message will be processed without cache: payload is consumable");
                return Mono.just((Object)request).transform((Function)processor);
            }
            try {
                key = this.keyGenerator.generateKey((CoreEvent)request);
            }
            catch (Exception e) {
                this.logger.warn("Message will be processed without cache: key generation error", (Throwable)e);
                return Mono.just((Object)request).transform((Function)processor);
            }
            return Mono.just((Object)request).transform(this.processMessageWithCache(key, processor));
        });
    }

    private Function<Publisher<CoreEvent>, Publisher<CoreEvent>> processMessageWithCache(String key, Processor messageProcessor) {
        return stream -> Flux.from((Publisher)stream).flatMap(request -> {
            CoreEvent cachedResponse = this.lookupEventInCache(key);
            if (cachedResponse != null) {
                return Mono.just((Object)this.createResponseEvent((CoreEvent)request, cachedResponse));
            }
            Lock lock = this.getLock((Serializable)((Object)key));
            if (lock.tryLock()) {
                try {
                    Mono<? extends CoreEvent> mono = this.cacheAwareProcess(key, messageProcessor, (CoreEvent)request);
                    return mono;
                }
                finally {
                    lock.unlock();
                }
            }
            if (!this.schedulerService.isCurrentThreadForCpuWork()) {
                lock.lock();
                try {
                    Mono<? extends CoreEvent> mono = this.cacheAwareProcess(key, messageProcessor, (CoreEvent)request);
                    return mono;
                }
                finally {
                    lock.unlock();
                }
            }
            Mono publisher = Mono.just((Object)request);
            if (!TransactionCoordination.isTransactionActive()) {
                Objects.requireNonNull(this.getLockScheduler());
                publisher = publisher.publishOn(Schedulers.fromExecutorService((ExecutorService)this.getLockScheduler()));
            }
            return publisher.flatMap(req -> {
                lock.lock();
                try {
                    Mono<? extends CoreEvent> mono = this.cacheAwareProcess(key, messageProcessor, (CoreEvent)req);
                    return mono;
                }
                finally {
                    lock.unlock();
                }
            });
        });
    }

    private Mono<? extends CoreEvent> cacheAwareProcess(String key, Processor messageProcessor, CoreEvent request) {
        CoreEvent cachedResponse = this.lookupEventInCache(key);
        if (cachedResponse != null) {
            return Mono.just((Object)this.createResponseEvent(request, cachedResponse));
        }
        if (!this.isSynchronizedAccess()) {
            return Mono.just((Object)request).transform((Function)messageProcessor).doOnNext(Exceptions.checkedConsumer(r -> this.storeResponse((CoreEvent)r, key)));
        }
        Mono publisher = Mono.just((Object)request);
        if (!TransactionCoordination.isTransactionActive()) {
            publisher = publisher.publishOn(Schedulers.fromExecutorService((ExecutorService)this.getLockScheduler()));
        }
        return publisher.handle((req, sink) -> {
            Lock lock = this.getLock((Serializable)((Object)key));
            lock.lock();
            try {
                CoreEvent response = (CoreEvent)Mono.just((Object)req).transform((Function)messageProcessor).doOnNext(Exceptions.checkedConsumer(r -> this.storeResponse((CoreEvent)r, key))).block();
                if (response != null) {
                    sink.next((Object)response);
                } else {
                    sink.complete();
                }
            }
            finally {
                lock.unlock();
            }
        });
    }

    private CoreEvent createResponseEvent(CoreEvent event, CoreEvent cachedResponse) {
        return this.responseGenerator.create(event, this.muleEventCopier.createEventCopy(cachedResponse, event.getContext()));
    }

    private void storeResponse(CoreEvent response, String key) {
        boolean copy = response != null ? !response.getMessage().getPayload().getDataType().isStreamType() : true;
        if (copy) {
            CoreEvent responseCopy = this.muleEventCopier.createEventCopy(StreamingUtils.consumeRepeatablePayload((CoreEvent)response), response.getContext());
            this.store(key, responseCopy);
        }
    }

    private CoreEvent lookupEventInCache(String key) {
        CoreEvent event = this.retrieve(key);
        if (this.logger.isDebugEnabled()) {
            if (event != null) {
                this.logger.debug("Cache hit for key: " + key + " Event: " + event);
            } else {
                this.logger.debug("Cache miss for key: " + key);
            }
        }
        return event;
    }

    protected Lock getLock(Serializable key) {
        Lock lock = null;
        if (this.isSynchronizedAccess()) {
            if (key != null && !(key instanceof String)) {
                throw new IllegalArgumentException(String.format("Cannot synchronize cache key. Key  must be '%s' but was '%s'", String.class.getName(), key.getClass().getName()));
            }
            lock = this.getLockFactory().createLock((String)((Object)key));
        } else {
            lock = NO_OP_LOCK;
        }
        return lock;
    }

    protected abstract void store(String var1, CoreEvent var2);

    protected abstract CoreEvent retrieve(String var1);

    public MuleEventKeyGenerator getKeyGenerator() {
        return this.keyGenerator;
    }

    public void setKeyGenerator(MuleEventKeyGenerator keyGenerator) {
        this.keyGenerator = keyGenerator;
    }

    public ResponseGenerator getResponseGenerator() {
        return this.responseGenerator;
    }

    public void setResponseGenerator(ResponseGenerator responseGenerator) {
        this.responseGenerator = responseGenerator;
    }

    public String getName() {
        return this.name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public MuleEventCopier getMuleEventCopier() {
        return this.muleEventCopier;
    }

    public void setMuleEventCopier(MuleEventCopier muleEventCopier) {
        this.muleEventCopier = muleEventCopier;
    }

    public void setSynchronizedAccess(boolean synchronizedAccess) {
        this.synchronizedAccess = synchronizedAccess;
    }

    public boolean isSynchronizedAccess() {
        return this.synchronizedAccess;
    }

    public LockFactory getLockFactory() {
        return this.lockFactory;
    }

    @Inject
    public void setLockFactory(LockFactory lockFactory) {
        this.lockFactory = lockFactory;
    }

    public void setSchedulerService(SchedulerService schedulerService) {
        this.schedulerService = schedulerService;
    }

    @Inject
    public void setMuleContext(MuleContext muleContext) {
        this.muleContext = muleContext;
    }

    protected Scheduler getLockScheduler() {
        return this.lockScheduler;
    }
}

