/*
 * Decompiled with CFR 0.152.
 */
package org.mule.routing;

import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentMap;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import edu.emory.mathcs.backport.java.util.concurrent.helpers.Utils;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import org.apache.commons.collections.buffer.BoundedFifoBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mule.DefaultMuleEvent;
import org.mule.DefaultMuleSession;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.MuleMessage;
import org.mule.api.routing.MessageInfoMapping;
import org.mule.api.routing.ResponseTimeoutException;
import org.mule.api.routing.RoutingException;
import org.mule.api.service.Service;
import org.mule.config.i18n.CoreMessages;
import org.mule.context.notification.RoutingNotification;
import org.mule.routing.CorrelationTimeoutException;
import org.mule.routing.EventCorrelatorCallback;
import org.mule.routing.inbound.EventGroup;
import org.mule.util.MapUtils;
import org.mule.util.concurrent.Latch;

public class EventCorrelator {
    protected final transient Log logger = LogFactory.getLog(EventCorrelator.class);
    public static final String NO_CORRELATION_ID = "no-id";
    public static final int MAX_PROCESSED_GROUPS = 50000;
    protected static final long MILLI_TO_NANO_MULTIPLIER = 1000000L;
    protected final ConcurrentMap eventGroups = new ConcurrentHashMap();
    protected final ConcurrentMap locks = new ConcurrentHashMap();
    protected final ConcurrentMap responseMessages = new ConcurrentHashMap();
    protected final Object groupsLock = new Object();
    protected final BoundedFifoBuffer processedGroups = new BoundedFifoBuffer(50000);
    private int timeout = -1;
    private boolean failOnTimeout = true;
    private MessageInfoMapping messageInfoMapping;
    private MuleContext context;
    private EventCorrelatorCallback callback;
    private AtomicBoolean timerStarted = new AtomicBoolean(false);

    public EventCorrelator(EventCorrelatorCallback callback, MessageInfoMapping messageInfoMapping, MuleContext context) {
        if (callback == null) {
            throw new IllegalArgumentException(CoreMessages.objectIsNull("EventCorrelatorCallback").getMessage());
        }
        if (messageInfoMapping == null) {
            throw new IllegalArgumentException(CoreMessages.objectIsNull("MessageInfoMapping").getMessage());
        }
        if (context == null) {
            throw new IllegalArgumentException(CoreMessages.objectIsNull("MuleContext").getMessage());
        }
        this.callback = callback;
        this.messageInfoMapping = messageInfoMapping;
        this.context = context;
    }

    public void enableTimeoutMonitor() throws WorkException {
        if (this.timerStarted.get()) {
            return;
        }
        this.context.getWorkManager().scheduleWork(new Work(){

            public void release() {
            }

            public void run() {
                while (true) {
                    EventGroup group;
                    ArrayList<EventGroup> expired = new ArrayList<EventGroup>(1);
                    for (Object o : EventCorrelator.this.eventGroups.values()) {
                        group = (EventGroup)o;
                        if (group.getCreated() + (long)EventCorrelator.this.getTimeout() * 1000000L >= Utils.nanoTime()) continue;
                        expired.add(group);
                    }
                    if (expired.size() > 0) {
                        Iterator i$ = expired.iterator();
                        while (i$.hasNext()) {
                            EventGroup anExpired;
                            group = anExpired = (EventGroup)i$.next();
                            EventCorrelator.this.eventGroups.remove(group.getGroupId());
                            EventCorrelator.this.locks.remove(group.getGroupId());
                            Service service = group.toArray()[0].getService();
                            if (EventCorrelator.this.isFailOnTimeout()) {
                                EventCorrelator.this.context.fireNotification(new RoutingNotification(group.toMessageCollection(), null, 1303));
                                service.getExceptionListener().exceptionThrown(new CorrelationTimeoutException(CoreMessages.correlationTimedOut(group.getGroupId()), group.toMessageCollection()));
                                continue;
                            }
                            if (EventCorrelator.this.logger.isDebugEnabled()) {
                                EventCorrelator.this.logger.debug((Object)MessageFormat.format("Aggregator expired, but ''failOnTimeOut'' is false. Forwarding {0} events out of {1} total for group ID: {2}", group.size(), group.expectedSize(), group.getGroupId()));
                            }
                            try {
                                MuleMessage msg = EventCorrelator.this.callback.aggregateEvents(group);
                                DefaultMuleEvent newEvent = new DefaultMuleEvent(msg, group.toArray()[0].getEndpoint(), new DefaultMuleSession(service, EventCorrelator.this.context), false);
                                service.getComponent().invoke(newEvent);
                            }
                            catch (Exception e) {
                                service.getExceptionListener().exceptionThrown(e);
                            }
                        }
                    }
                    try {
                        Thread.sleep(100L);
                    }
                    catch (InterruptedException e) {
                        return;
                    }
                }
            }
        });
    }

    public Map getResponseMessages() {
        return Collections.unmodifiableMap(this.responseMessages);
    }

    public MuleMessage process(MuleEvent event) throws RoutingException {
        this.addEvent(event);
        String correlationId = this.messageInfoMapping.getCorrelationId(event.getMessage());
        if (this.locks.get((Object)correlationId) != null) {
            this.locks.remove((Object)correlationId);
            return (MuleMessage)this.responseMessages.remove((Object)correlationId);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void addEvent(MuleEvent event) throws RoutingException {
        EventGroup group;
        String groupId = this.messageInfoMapping.getCorrelationId(event.getMessage());
        if (groupId == null) throw new RoutingException(CoreMessages.noCorrelationId(), event.getMessage(), event.getEndpoint());
        if (groupId.equals("-1")) {
            throw new RoutingException(CoreMessages.noCorrelationId(), event.getMessage(), event.getEndpoint());
        }
        boolean lookupMiss = false;
        while (true) {
            if (lookupMiss) {
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
            if (this.isGroupAlreadyProcessed(groupId)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("An event was received for an event group that has already been processed, this is probably because the async-reply timed out. Correlation Id is: " + groupId + ". Dropping event"));
                }
                this.context.fireNotification(new RoutingNotification(event.getMessage(), event.getEndpoint().getEndpointURI().toString(), 1301));
                return;
            }
            group = this.getEventGroup(groupId);
            if (group == null) {
                group = this.addEventGroup(this.callback.createEventGroup(event, groupId));
            }
            EventGroup eventGroup = group;
            synchronized (eventGroup) {
                if (group == this.getEventGroup(groupId)) break;
                lookupMiss = true;
            }
        }
        {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Adding event to aggregator group: " + groupId));
            }
            group.addEvent(event);
            if (!this.callback.shouldAggregateEvents(group)) return;
            MuleMessage returnMessage = this.callback.aggregateEvents(group);
            this.removeEventGroup(group);
            MuleMessage previousResult = (MuleMessage)this.responseMessages.putIfAbsent((Object)groupId, (Object)returnMessage);
            if (previousResult != null) {
                throw new IllegalStateException("Detected duplicate aggregation result message with id: " + groupId);
            }
            Latch l = (Latch)((Object)this.locks.get((Object)groupId));
            if (l == null) {
                Latch previous;
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Creating latch for " + groupId + " in " + this));
                }
                if ((previous = (Latch)((Object)this.locks.putIfAbsent((Object)groupId, (Object)(l = new Latch())))) != null) {
                    l = previous;
                }
            }
            l.countDown();
            return;
        }
    }

    protected EventGroup getEventGroup(Object groupId) {
        return (EventGroup)this.eventGroups.get(groupId);
    }

    protected EventGroup addEventGroup(EventGroup group) {
        EventGroup previous = (EventGroup)this.eventGroups.putIfAbsent(group.getGroupId(), (Object)group);
        return previous != null ? previous : group;
    }

    protected void removeEventGroup(EventGroup group) {
        Object groupId = group.getGroupId();
        this.eventGroups.remove(groupId);
        this.addProcessedGroup(groupId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addProcessedGroup(Object id) {
        Object object = this.groupsLock;
        synchronized (object) {
            if (this.processedGroups.isFull()) {
                this.processedGroups.remove();
            }
            this.processedGroups.add(id);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean isGroupAlreadyProcessed(Object id) {
        Object object = this.groupsLock;
        synchronized (object) {
            return this.processedGroups.contains(id);
        }
    }

    public MuleMessage getResponse(MuleMessage message) throws RoutingException {
        return this.getResponse(message, this.getTimeout());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public MuleMessage getResponse(MuleMessage message, int timeout) throws RoutingException {
        MuleMessage result;
        Latch l;
        String responseId = this.messageInfoMapping.getMessageId(message);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Waiting for response for message id: " + responseId + " in " + this));
        }
        if ((l = (Latch)((Object)this.locks.get((Object)responseId))) == null) {
            Latch previous;
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Got response but no one is waiting for it yet. Creating latch for " + responseId + " in " + this));
            }
            if ((previous = (Latch)((Object)this.locks.putIfAbsent((Object)responseId, (Object)(l = new Latch())))) != null) {
                l = previous;
            }
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Got latch for message: " + responseId));
        }
        boolean resultAvailable = false;
        boolean interruptedWhileWaiting = false;
        try {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Waiting for response to message: " + responseId));
            }
            if (this.getTimeout() <= 0) {
                l.await();
                resultAvailable = true;
            } else {
                resultAvailable = l.await(timeout, TimeUnit.MILLISECONDS);
            }
        }
        catch (InterruptedException e) {
            interruptedWhileWaiting = true;
        }
        finally {
            this.locks.remove((Object)responseId);
            result = (MuleMessage)this.responseMessages.remove((Object)responseId);
            if (interruptedWhileWaiting) {
                Thread.currentThread().interrupt();
            }
        }
        if (!resultAvailable) {
            if (this.isFailOnTimeout()) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace((Object)("Current responses are: \n" + MapUtils.toString((Map)this.responseMessages, true)));
                }
                this.context.fireNotification(new RoutingNotification(message, null, 1302));
                throw new ResponseTimeoutException(CoreMessages.responseTimedOutWaitingForId(this.getTimeout(), responseId), message, null);
            }
            EventGroup group = this.getEventGroup(responseId);
            if (group == null) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace((Object)("There is no current event Group. Current responses are: \n" + MapUtils.toString((Map)this.responseMessages, true)));
                }
                return null;
            }
            this.removeEventGroup(group);
            MuleMessage msg = this.callback.aggregateEvents(group);
            return msg;
        }
        if (result == null) {
            throw new IllegalStateException("Response Message is null");
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("remaining locks  : " + this.locks.keySet()));
            this.logger.debug((Object)("remaining results: " + this.responseMessages.keySet()));
        }
        return result;
    }

    public boolean isFailOnTimeout() {
        return this.failOnTimeout;
    }

    public void setFailOnTimeout(boolean failOnTimeout) {
        this.failOnTimeout = failOnTimeout;
    }

    public int getTimeout() {
        return this.timeout;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }
}

