/*
 * Decompiled with CFR 0.152.
 */
package org.mule.extension.aggregator.internal.privileged.executor;

import java.io.InputStream;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.inject.Inject;
import javax.inject.Named;
import org.mule.extension.aggregator.api.AggregationAttributes;
import org.mule.extension.aggregator.internal.config.AggregatorManager;
import org.mule.extension.aggregator.internal.errors.AggregatorError;
import org.mule.extension.aggregator.internal.privileged.CompletionCallbackWrapper;
import org.mule.extension.aggregator.internal.source.AggregatorListener;
import org.mule.extension.aggregator.internal.storage.content.AggregatedContent;
import org.mule.extension.aggregator.internal.storage.info.AggregatorSharedInformation;
import org.mule.extension.aggregator.internal.task.AsyncTask;
import org.mule.runtime.api.cluster.ClusterService;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Lifecycle;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lock.LockFactory;
import org.mule.runtime.api.message.ItemSequenceInfo;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.meta.model.operation.OperationModel;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.notification.NotificationListenerRegistry;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.store.ObjectAlreadyExistsException;
import org.mule.runtime.api.store.ObjectDoesNotExistException;
import org.mule.runtime.api.store.ObjectStore;
import org.mule.runtime.api.store.ObjectStoreException;
import org.mule.runtime.api.store.ObjectStoreManager;
import org.mule.runtime.api.streaming.bytes.CursorStreamProvider;
import org.mule.runtime.api.time.TimeSupplier;
import org.mule.runtime.api.transformation.TransformationService;
import org.mule.runtime.api.util.LazyValue;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.lifecycle.PrimaryNodeLifecycleNotificationListener;
import org.mule.runtime.extension.api.error.ErrorTypeDefinition;
import org.mule.runtime.extension.api.exception.ModuleException;
import org.mule.runtime.extension.api.runtime.operation.ComponentExecutor;
import org.mule.runtime.extension.api.runtime.operation.ExecutionContext;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.route.Route;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.runtime.module.extension.api.runtime.privileged.ExecutionContextAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractAggregatorExecutor
implements ComponentExecutor<OperationModel>,
Lifecycle {
    public static final String QUORUM_EXCEPTION = "QuorumException";
    final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
    private static final String AGGREGATORS_MODULE_KEY = "AGGREGATORS";
    public static final int MAX_RETRIES = 3;
    public static final long BASE_DELAY_MILLIS = 100L;
    private final boolean failOnStartIfNoQuorum = Boolean.parseBoolean(System.getProperty("mule.aggregator.executor.failOnStartIfNoQuorum", "true"));
    private static final int waitForQuorum = Integer.parseInt(System.getProperty("mule.aggregator.executor.waitForQuorum", "100"));
    @Inject
    @Named(value="_muleObjectStoreManager")
    private ObjectStoreManager objectStoreManager;
    @Inject
    private SchedulerService schedulerService;
    @Inject
    private AggregatorManager aggregatorManager;
    @Inject
    private LockFactory lockFactory;
    @Inject
    private TimeSupplier timeSupplier;
    @Inject
    private NotificationListenerRegistry notificationListenerRegistry;
    @Inject
    private ClusterService clusterService;
    @Inject
    private TransformationService transformationService;
    private ObjectStore<AggregatorSharedInformation> objectStore;
    private String name;
    private Scheduler scheduler;
    private Scheduler schedulerQuorum;
    private PrimaryNodeLifecycleNotificationListener notificationListener;
    private AggregatorSharedInformation sharedInfoLocalCopy;
    private LazyValue<ObjectStore<AggregatorSharedInformation>> storage;
    private boolean started = false;
    private final Object stoppingLock = new Object();
    private boolean shouldSynchronizeToOS = true;

    protected void injectParameters(Map<String, Object> parameters) {
        this.objectStore = (ObjectStore)parameters.get("objectStore");
        this.name = (String)parameters.get("name");
    }

    Optional<ItemSequenceInfo> getItemSequenceInfo(ExecutionContext context) {
        CoreEvent event = ((ExecutionContextAdapter)context).getEvent();
        return event.getItemSequenceInfo();
    }

    private Object consumingStream(Object element) {
        if (element instanceof InputStream) {
            return this.transformationService.transform(element, DataType.INPUT_STREAM, DataType.BYTE_ARRAY);
        }
        if (element instanceof CursorStreamProvider) {
            return this.transformationService.transform(element, DataType.CURSOR_STREAM_PROVIDER, DataType.BYTE_ARRAY);
        }
        return element;
    }

    private TypedValue consumingStreams(TypedValue element) {
        Object elementValue = element.getValue();
        MediaType elementMediaType = element.getDataType().getMediaType();
        Object consumedValue = elementValue instanceof Message ? Message.builder((Message)((Message)elementValue)).payload(this.consumingStreams(((Message)elementValue).getPayload())).attributes(this.consumingStreams(((Message)elementValue).getAttributes())).build() : this.consumingStream(elementValue);
        return new TypedValue(consumedValue, DataType.builder().fromObject(consumedValue).mediaType(elementMediaType).build());
    }

    void addToStorage(AggregatedContent aggregatedContent, TypedValue aggregatedElement, Optional<ItemSequenceInfo> itemSequenceInfo) {
        TypedValue aggregatedElementTypedValue = this.consumingStreams(aggregatedElement);
        if (itemSequenceInfo.isPresent()) {
            aggregatedContent.add(aggregatedElementTypedValue, this.getCurrentTime(), itemSequenceInfo.get().getPosition());
        } else {
            aggregatedContent.add(aggregatedElementTypedValue, this.getCurrentTime());
        }
    }

    public void initialise() throws InitialisationException {
        LifecycleUtils.initialiseIfNeeded((Object)this.aggregatorManager);
        this.aggregatorManager.registerAggregator(this.name, this::scheduleRegisteredAsyncAggregations);
        this.storage = new LazyValue(this::getConfiguredObjectStore);
        this.notificationListener = new PrimaryNodeLifecycleNotificationListener((Startable)this, this.notificationListenerRegistry);
        this.notificationListener.register();
    }

    ObjectStore getConfiguredObjectStore() {
        if (this.objectStore == null) {
            return this.objectStoreManager.getDefaultPartition();
        }
        return this.objectStore;
    }

    public void start() throws MuleException {
        if (this.clusterService.isPrimaryPollingInstance() && !this.started) {
            LifecycleUtils.startIfNeeded(this.objectStore);
            if (this.failOnStartIfNoQuorum) {
                this.deferredActions();
            } else {
                this.schedulerQuorum = this.schedulerService.cpuLightScheduler(SchedulerConfig.config());
                this.schedulerQuorum.scheduleAtFixedRate(() -> this.executeDeferActions(), (long)waitForQuorum, (long)waitForQuorum, TimeUnit.MILLISECONDS);
            }
            this.scheduler = this.getStorage().isPersistent() ? this.schedulerService.ioScheduler(SchedulerConfig.config().withShutdownTimeout(0L, TimeUnit.MILLISECONDS)) : this.schedulerService.cpuLightScheduler(SchedulerConfig.config().withShutdownTimeout(0L, TimeUnit.MILLISECONDS));
            this.started = true;
        }
    }

    private void executeDeferActions() {
        try {
            this.deferredActions();
            this.schedulerQuorum.stop();
            this.schedulerQuorum = null;
        }
        catch (Exception e) {
            if (e.getClass().getName().contains(QUORUM_EXCEPTION)) {
                this.LOGGER.warn("The required quorum was not reached. Waiting for quorum");
                if (this.LOGGER.isDebugEnabled()) {
                    this.LOGGER.debug(e.getMessage());
                }
            }
            this.LOGGER.error(e.getMessage(), (Throwable)e);
        }
    }

    private void deferredActions() {
        this.upgradeAggregatedContentIfNeeded();
        this.setRegisteredAsyncAggregationsAsNotScheduled();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() throws MuleException {
        Object object = this.stoppingLock;
        synchronized (object) {
            this.shouldSynchronizeToOS = false;
            this.started = false;
            if (this.scheduler != null) {
                this.scheduler.stop();
                this.scheduler = null;
            }
            if (this.schedulerQuorum != null) {
                this.schedulerQuorum.stop();
                this.schedulerQuorum = null;
            }
        }
    }

    public void dispose() {
        if (this.scheduler != null) {
            this.scheduler.stop();
        }
        if (this.schedulerQuorum != null) {
            this.schedulerQuorum.stop();
            this.schedulerQuorum = null;
        }
    }

    void executeRouteWithAggregatedElements(Route route, List<TypedValue> elements, AggregationAttributes attributes, CompletableFuture<Result<Object, Object>> future) {
        route.getChain().process(elements, (Object)attributes, future::complete, (e, r) -> future.completeExceptionally((Throwable)e));
    }

    void finishExecution(CompletableFuture<Result<Object, Object>> future, CompletionCallbackWrapper completionCallback) {
        try {
            completionCallback.success(future.get());
        }
        catch (ExecutionException e) {
            completionCallback.error(e.getCause());
        }
        catch (InterruptedException e) {
            completionCallback.error(e);
        }
    }

    private void scheduleRegisteredAsyncAggregations() {
        this.executeSynchronized(this::doScheduleRegisteredAsyncAggregations);
    }

    abstract boolean doScheduleRegisteredAsyncAggregations();

    private void setRegisteredAsyncAggregationsAsNotScheduled() {
        this.executeSynchronized(this::doSetRegisteredAsyncAggregationsAsNotScheduled);
    }

    abstract boolean doSetRegisteredAsyncAggregationsAsNotScheduled();

    void scheduleTask(AsyncTask task, Runnable runnable) {
        long now = this.getCurrentTime();
        long configuredDelay = task.getDelayTimeUnit().toMillis(task.getDelay());
        long delay = configuredDelay - (now - task.getRegisteringTimestamp());
        this.scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS);
    }

    void evaluateConfiguredDelay(String valueKey, int configuredDelay, TimeUnit timeUnit) throws ModuleException {
        long configuredDelayInMillis = timeUnit.toMillis(configuredDelay);
        if (configuredDelayInMillis < this.aggregatorManager.getTaskSchedulingPeriodInMillis()) {
            throw new ModuleException(String.format("The configured %s : %d %s, is too small for the configured scheduling time period: %d MILLISECONDS. %s should be equal or bigger than the scheduling time period in order to accurately schedule it.%s Use %s global-config or %s SystemProperty to change it", new Object[]{valueKey, configuredDelay, timeUnit, this.aggregatorManager.getTaskSchedulingPeriodInMillis(), valueKey, System.lineSeparator(), "aggregatorsSchedulingPeriod", "mule.aggregatorsSchedulingPeriod"}), (ErrorTypeDefinition)AggregatorError.AGGREGATOR_CONFIG);
        }
    }

    void notifyListenerOnComplete(List<TypedValue> elements, AggregationAttributes aggregationAttributes) {
        this.getListenerAndExecute(listener -> this.executeListener((AggregatorListener)((Object)listener), elements, aggregationAttributes));
    }

    void notifyListenerOnTimeout(List<TypedValue> elements, AggregationAttributes aggregationAttributes) {
        this.getListenerAndExecute(listener -> {
            if (listener.shouldIncludeTimedOutGroups()) {
                this.executeListener((AggregatorListener)((Object)listener), elements, aggregationAttributes);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void executeSynchronized(Supplier<Boolean> task) {
        Object object = this.stoppingLock;
        synchronized (object) {
            if (this.shouldSynchronizeToOS) {
                Lock lock = this.lockFactory.createLock(this.getAggregatorKey());
                lock.lock();
                try {
                    this.pullSharedInfo();
                    boolean hasChanges = task.get();
                    if (hasChanges) {
                        this.pushSharedInfo();
                    }
                }
                finally {
                    lock.unlock();
                }
            }
        }
    }

    private String getAggregatorKey() {
        return String.format("%s:%s:%s", AGGREGATORS_MODULE_KEY, this.doGetAggregatorKey(), this.name);
    }

    abstract String doGetAggregatorKey();

    abstract AggregatorSharedInformation createSharedInfo();

    Long getCurrentTime() {
        return this.timeSupplier.get();
    }

    AggregatorSharedInformation getSharedInfoLocalCopy() {
        return this.sharedInfoLocalCopy;
    }

    private ObjectStore<AggregatorSharedInformation> getStorage() {
        return (ObjectStore)this.storage.get();
    }

    private void pullSharedInfo() throws ModuleException {
        String aggregatorKey = this.getAggregatorKey();
        for (int attempt = 1; attempt <= 3; ++attempt) {
            try {
                this.sharedInfoLocalCopy = (AggregatorSharedInformation)this.getStorage().retrieve(aggregatorKey);
                return;
            }
            catch (ObjectDoesNotExistException e) {
                this.LOGGER.debug("Key [{}] does not exist. Creating new shared info.", (Object)aggregatorKey);
                this.sharedInfoLocalCopy = this.createSharedInfo();
                return;
            }
            catch (ObjectStoreException e) {
                this.handleRetryOrThrow(attempt, (Exception)((Object)e));
                continue;
            }
        }
    }

    private void pushSharedInfo() throws ModuleException {
        String aggregatorKey = this.getAggregatorKey();
        for (int attempt = 1; attempt <= 3; ++attempt) {
            try {
                this.getStorage().store(aggregatorKey, (Serializable)this.sharedInfoLocalCopy);
                return;
            }
            catch (ObjectAlreadyExistsException e) {
                try {
                    this.removeSharedInfo(aggregatorKey);
                    this.getStorage().store(aggregatorKey, (Serializable)this.sharedInfoLocalCopy);
                    return;
                }
                catch (ObjectStoreException ex) {
                    this.handleRetryOrThrow(attempt, (Exception)((Object)ex));
                    continue;
                }
            }
            catch (ObjectStoreException e) {
                this.handleRetryOrThrow(attempt, (Exception)((Object)e));
            }
        }
    }

    private void removeSharedInfo(String aggregatorKey) throws ObjectStoreException {
        try {
            this.getStorage().remove(aggregatorKey);
        }
        catch (ObjectDoesNotExistException e) {
            this.LOGGER.warn("Key [{}] does not exist in ObjectStore", (Object)aggregatorKey);
        }
    }

    private long getBaseDelayMillis(int attempt) {
        return 100L * (1L << attempt - 1);
    }

    private void handleRetryOrThrow(int attempt, Exception e) throws ModuleException {
        if (attempt == 3) {
            throw new ModuleException("Failed after retries", (ErrorTypeDefinition)AggregatorError.OBJECT_STORE_ACCESS, (Throwable)e);
        }
        try {
            long delay = this.getBaseDelayMillis(attempt);
            Thread.sleep(delay);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ModuleException("Interrupted during retry backoff", (ErrorTypeDefinition)AggregatorError.OBJECT_STORE_ACCESS, (Throwable)ie);
        }
    }

    private void getListenerAndExecute(Consumer<AggregatorListener> task) {
        this.aggregatorManager.getListener(this.name).ifPresent(task);
    }

    private void executeListener(AggregatorListener listener, List<TypedValue> elements, AggregationAttributes aggregationAttributes) {
        if (listener.isStarted()) {
            SourceCallback callback = listener.getCallback();
            SourceCallbackContext context = callback.createContext();
            context.setCorrelationId(aggregationAttributes.getAggregationId());
            callback.handle(Result.builder().output(elements).attributes((Object)aggregationAttributes).build(), context);
        }
    }

    @Deprecated
    private void upgradeAggregatedContentIfNeeded() {
        this.executeSynchronized(() -> this.sharedInfoLocalCopy.upgradeIfNeeded());
    }
}

