/*
 * Decompiled with CFR 0.152.
 */
package org.mule.routing.response;

import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentMap;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import java.util.Map;
import org.apache.commons.collections.buffer.BoundedFifoBuffer;
import org.mule.api.MuleEvent;
import org.mule.api.MuleMessage;
import org.mule.api.lifecycle.InitialisationException;
import org.mule.api.routing.ResponseTimeoutException;
import org.mule.api.routing.RoutingException;
import org.mule.config.i18n.CoreMessages;
import org.mule.context.notification.RoutingNotification;
import org.mule.routing.inbound.EventGroup;
import org.mule.routing.response.AbstractResponseRouter;
import org.mule.util.MapUtils;
import org.mule.util.concurrent.Latch;

public abstract class AbstractResponseAggregator
extends AbstractResponseRouter {
    public static final int MAX_PROCESSED_GROUPS = 50000;
    protected final ConcurrentMap eventGroups = new ConcurrentHashMap();
    protected final ConcurrentMap locks = new ConcurrentHashMap();
    protected final ConcurrentMap responseMessages = new ConcurrentHashMap();
    protected final BoundedFifoBuffer processedGroups = new BoundedFifoBuffer(50000);
    private int timeout = -1;
    private boolean failOnTimeout = true;

    public void initialise() throws InitialisationException {
        if (this.timeout == -1) {
            this.setTimeout(this.muleContext.getConfiguration().getDefaultSynchronousEventTimeout());
        }
        super.initialise();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void process(MuleEvent event) throws RoutingException {
        EventGroup group;
        Object groupId = this.getReplyAggregateIdentifier(event.getMessage());
        if (groupId == null) throw new RoutingException(CoreMessages.noCorrelationId(), event.getMessage(), event.getEndpoint());
        if (groupId.equals("-1")) {
            throw new RoutingException(CoreMessages.noCorrelationId(), event.getMessage(), event.getEndpoint());
        }
        boolean lookupMiss = false;
        while (true) {
            if (lookupMiss) {
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
            if (this.isGroupAlreadyProcessed(groupId)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("An event was received for an event group that has already been processed, this is probably because the async-reply timed out. Correlation Id is: " + groupId + ". Dropping event"));
                }
                this.muleContext.fireNotification(new RoutingNotification(event.getMessage(), event.getEndpoint().getEndpointURI().toString(), 1301));
                return;
            }
            group = this.getEventGroup(groupId);
            if (group == null) {
                group = this.addEventGroup(this.createEventGroup(event, groupId));
            }
            EventGroup eventGroup = group;
            synchronized (eventGroup) {
                if (group == this.getEventGroup(groupId)) break;
                lookupMiss = true;
            }
        }
        {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Adding event to response aggregator group: " + groupId));
            }
            group.addEvent(event);
            if (!this.shouldAggregateEvents(group)) return;
            MuleMessage returnMessage = this.aggregateEvents(group);
            this.removeEventGroup(group);
            MuleMessage previousResult = (MuleMessage)this.responseMessages.putIfAbsent(groupId, (Object)returnMessage);
            if (previousResult != null) {
                throw new IllegalStateException("Detected duplicate aggregation result message with id: " + groupId);
            }
            Latch l = (Latch)((Object)this.locks.get(groupId));
            if (l == null) {
                Latch previous;
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Creating latch for " + groupId + " in " + this));
                }
                if ((previous = (Latch)((Object)this.locks.putIfAbsent(groupId, (Object)(l = new Latch())))) != null) {
                    l = previous;
                }
            }
            l.countDown();
            return;
        }
    }

    protected EventGroup createEventGroup(MuleEvent event, Object groupId) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Creating new event group: " + groupId + " in " + this));
        }
        return new EventGroup(groupId);
    }

    protected EventGroup getEventGroup(Object groupId) {
        return (EventGroup)this.eventGroups.get(groupId);
    }

    protected EventGroup addEventGroup(EventGroup group) {
        EventGroup previous = (EventGroup)this.eventGroups.putIfAbsent(group.getGroupId(), (Object)group);
        return previous != null ? previous : group;
    }

    protected void removeEventGroup(EventGroup group) {
        this.eventGroups.remove(group.getGroupId());
        this.addProcessedGroup(group.getGroupId());
    }

    protected void addProcessedGroup(Object id) {
        if (this.processedGroups.isFull()) {
            this.processedGroups.remove();
        }
        this.processedGroups.add(id);
    }

    protected boolean isGroupAlreadyProcessed(Object id) {
        return this.processedGroups.contains(id);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public MuleMessage getResponse(MuleMessage message) throws RoutingException {
        MuleMessage result;
        Latch l;
        Object responseId = this.getCallResponseAggregateIdentifier(message);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Waiting for response for message id: " + responseId + " in " + this));
        }
        if ((l = (Latch)((Object)this.locks.get(responseId))) == null) {
            Latch previous;
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Got response but no one is waiting for it yet. Creating latch for " + responseId + " in " + this));
            }
            if ((previous = (Latch)((Object)this.locks.putIfAbsent(responseId, (Object)(l = new Latch())))) != null) {
                l = previous;
            }
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Got latch for message: " + responseId));
        }
        boolean resultAvailable = false;
        boolean interruptedWhileWaiting = false;
        try {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Waiting for response to message: " + responseId));
            }
            if (this.getTimeout() <= 0) {
                l.await();
                resultAvailable = true;
            } else {
                resultAvailable = l.await(this.getTimeout(), TimeUnit.MILLISECONDS);
            }
        }
        catch (InterruptedException e) {
            interruptedWhileWaiting = true;
        }
        finally {
            this.locks.remove(responseId);
            result = (MuleMessage)this.responseMessages.remove(responseId);
            if (interruptedWhileWaiting) {
                Thread.currentThread().interrupt();
            }
        }
        if (!resultAvailable) {
            if (this.isFailOnTimeout()) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace((Object)("Current responses are: \n" + MapUtils.toString((Map)this.responseMessages, true)));
                }
                this.muleContext.fireNotification(new RoutingNotification(message, null, 1302));
                throw new ResponseTimeoutException(CoreMessages.responseTimedOutWaitingForId(this.getTimeout(), responseId), message, null);
            }
            EventGroup group = this.getEventGroup(responseId);
            if (group == null) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace((Object)("There is no current event Group. Current responses are: \n" + MapUtils.toString((Map)this.responseMessages, true)));
                }
                return null;
            }
            this.removeEventGroup(group);
            MuleMessage msg = this.aggregateEvents(group);
            return msg;
        }
        if (result == null) {
            throw new IllegalStateException("Response Message is null");
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("remaining locks  : " + this.locks.keySet()));
            this.logger.debug((Object)("remaining results: " + this.responseMessages.keySet()));
        }
        return result;
    }

    public boolean isFailOnTimeout() {
        return this.failOnTimeout;
    }

    public void setFailOnTimeout(boolean failOnTimeout) {
        this.failOnTimeout = failOnTimeout;
    }

    public int getTimeout() {
        return this.timeout;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    protected abstract boolean shouldAggregateEvents(EventGroup var1);

    protected abstract MuleMessage aggregateEvents(EventGroup var1) throws RoutingException;
}

