/*
 * Decompiled with CFR 0.152.
 */
package org.mule.routing;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.resource.spi.work.WorkException;
import org.apache.commons.collections.CollectionUtils;
import org.mule.DefaultMuleEvent;
import org.mule.api.DefaultMuleException;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.MuleMessage;
import org.mule.api.config.ThreadingProfile;
import org.mule.api.context.WorkManager;
import org.mule.api.lifecycle.Initialisable;
import org.mule.api.lifecycle.InitialisationException;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.processor.MessageRouter;
import org.mule.api.routing.AggregationContext;
import org.mule.api.routing.ResponseTimeoutException;
import org.mule.api.routing.RoutePathNotFoundException;
import org.mule.api.routing.RoutingException;
import org.mule.api.transport.DispatchException;
import org.mule.config.i18n.CoreMessages;
import org.mule.config.i18n.MessageFactory;
import org.mule.message.DefaultExceptionPayload;
import org.mule.processor.AbstractMessageProcessorOwner;
import org.mule.processor.chain.DefaultMessageProcessorChainBuilder;
import org.mule.routing.AbstractRoutingStrategy;
import org.mule.routing.AggregationStrategy;
import org.mule.routing.CollectAllAggregationStrategy;
import org.mule.util.Preconditions;
import org.mule.util.concurrent.ThreadNameHelper;
import org.mule.work.ProcessingMuleEventWork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ScatterGatherRouter
extends AbstractMessageProcessorOwner
implements MessageRouter {
    private static final Logger logger = LoggerFactory.getLogger(ScatterGatherRouter.class);
    private long timeout = 0L;
    private List<MessageProcessor> routes = new ArrayList<MessageProcessor>();
    private boolean initialised = false;
    private List<MessageProcessor> routeChains;
    private AggregationStrategy aggregationStrategy;
    private ThreadingProfile threadingProfile;
    private WorkManager workManager;

    @Override
    public MuleEvent process(MuleEvent event) throws MuleException {
        if (CollectionUtils.isEmpty(this.routes)) {
            throw new RoutePathNotFoundException(CoreMessages.noEndpointsForRouter(), event, null);
        }
        MuleMessage message = event.getMessage();
        AbstractRoutingStrategy.validateMessageIsNotConsumable(event, message);
        List<ProcessingMuleEventWork> works = this.executeWork(event);
        return this.processResponses(event, works);
    }

    private MuleEvent processResponses(MuleEvent event, List<ProcessingMuleEventWork> works) throws MuleException {
        ArrayList<MuleEvent> responses = new ArrayList<MuleEvent>(works.size());
        long remainingTimeout = this.timeout;
        for (int routeIndex = 0; routeIndex < works.size(); ++routeIndex) {
            MuleEvent response = null;
            RoutingException exception = null;
            ProcessingMuleEventWork work = works.get(routeIndex);
            MessageProcessor route = this.routes.get(routeIndex);
            long startedAt = System.currentTimeMillis();
            try {
                response = work.getResult(remainingTimeout, TimeUnit.MILLISECONDS);
            }
            catch (ResponseTimeoutException e) {
                exception = e;
            }
            catch (InterruptedException e) {
                throw new DefaultMuleException(MessageFactory.createStaticMessage(String.format("Was interrupted while waiting for route %d", routeIndex)), (Throwable)e);
            }
            catch (Exception e) {
                exception = new DispatchException(MessageFactory.createStaticMessage(String.format("route number %d failed to be executed", routeIndex)), event, route, exception);
            }
            remainingTimeout -= System.currentTimeMillis() - startedAt;
            if (exception != null) {
                if (logger.isDebugEnabled()) {
                    logger.debug(String.format("route %d generated exception for MuleEvent %s", routeIndex, event.getId()), (Throwable)exception);
                }
                response = DefaultMuleEvent.copy(event);
                response.getMessage().setExceptionPayload(new DefaultExceptionPayload(exception));
            } else if (logger.isDebugEnabled()) {
                logger.debug(String.format("route %d executed successfuly for event %s", routeIndex, event.getId()));
            }
            responses.add(response);
        }
        return this.aggregationStrategy.aggregate(new AggregationContext(event, responses));
    }

    private List<ProcessingMuleEventWork> executeWork(MuleEvent event) throws MuleException {
        ArrayList<ProcessingMuleEventWork> works = new ArrayList<ProcessingMuleEventWork>(this.routes.size());
        try {
            for (MessageProcessor route : this.routes) {
                ProcessingMuleEventWork work = new ProcessingMuleEventWork(route, DefaultMuleEvent.copy(event));
                this.workManager.scheduleWork(work);
                works.add(work);
            }
        }
        catch (WorkException e) {
            throw new DefaultMuleException(MessageFactory.createStaticMessage("Could not schedule work for route"), (Throwable)e);
        }
        return works;
    }

    @Override
    public void initialise() throws InitialisationException {
        try {
            this.buildRouteChains();
            if (this.threadingProfile == null) {
                this.threadingProfile = this.muleContext.getDefaultThreadingProfile();
            }
            if (this.aggregationStrategy == null) {
                this.aggregationStrategy = new CollectAllAggregationStrategy();
            }
            if (this.timeout <= 0L) {
                this.timeout = Long.MAX_VALUE;
            }
            this.workManager = this.threadingProfile.createWorkManager(ThreadNameHelper.getPrefix(this.muleContext) + "ScatterGatherWorkManager", this.muleContext.getConfiguration().getShutdownTimeout());
        }
        catch (Exception e) {
            throw new InitialisationException((Throwable)e, (Initialisable)this);
        }
        super.initialise();
        this.initialised = true;
    }

    @Override
    public void start() throws MuleException {
        this.workManager.start();
        super.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void dispose() {
        try {
            this.workManager.dispose();
        }
        catch (Exception e) {
            logger.error("Exception found while tring to dispose work manager. Will continue with the disposal", (Throwable)e);
        }
        finally {
            super.dispose();
        }
    }

    @Override
    public void addRoute(MessageProcessor processor) throws MuleException {
        this.checkNotInitialised();
        this.routes.add(processor);
    }

    @Override
    public void removeRoute(MessageProcessor processor) throws MuleException {
        this.checkNotInitialised();
        this.routes.remove(processor);
    }

    private void buildRouteChains() throws MuleException {
        this.routeChains = new ArrayList<MessageProcessor>(this.routes.size());
        for (MessageProcessor route : this.routes) {
            this.routeChains.add(new DefaultMessageProcessorChainBuilder().chain(route).build());
        }
    }

    private void checkNotInitialised() {
        Preconditions.checkState(!this.initialised, "<scatter-gather> router is not dynamic. Cannot modify routes after initialisation");
    }

    @Override
    protected List<MessageProcessor> getOwnedMessageProcessors() {
        return this.routes;
    }

    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
        this.aggregationStrategy = aggregationStrategy;
    }

    public void setThreadingProfile(ThreadingProfile threadingProfile) {
        this.threadingProfile = threadingProfile;
    }

    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }

    public void setRoutes(List<MessageProcessor> routes) {
        this.routes = routes;
    }
}

