/*
 * Decompiled with CFR 0.152.
 */
package com.solacesystems.jcsmp.protocol.nio.impl;

import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.XMLMessageListener;
import com.solacesystems.jcsmp.protocol.nio.Notification;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ConsumerNotificationDispatcher
implements Runnable {
    private static final Log log = LogFactory.getLog(ConsumerNotificationDispatcher.class);
    private static final int CONSUMER_DISPATCHER_QUEUE_SIZE_OVERHEAD_DEFAULT = 100;
    private static int consumerDispatcherQueueSizeOverhead = 100;
    private static final String Solace_ConsumerDispatcherQueueSizeOverhead = "Solace_ConsumerDispatcherQueueSizeOverhead";
    private ArrayBlockingQueue<Notification> _dispatcherQ = new ArrayBlockingQueue(JCSMPFactory.onlyInstance().getGlobalProperties().getConsumerDispatcherQueueSize() + consumerDispatcherQueueSizeOverhead);
    private Thread serviceThread = new Thread(this);

    private ConsumerNotificationDispatcher(String ctxId) {
        this.serviceThread.setName(String.format("Context_%s_ConsumerDispatcher", ctxId));
        this.serviceThread.setDaemon(true);
    }

    public static ConsumerNotificationDispatcher create(String name) {
        ConsumerNotificationDispatcher disp = new ConsumerNotificationDispatcher(name);
        disp.serviceThread.start();
        return disp;
    }

    public boolean enqueueNonBlockingNotification(Notification notif) {
        return this._dispatcherQ.offer(notif);
    }

    public void enqueueBlockingNotification(Notification notif) throws InterruptedException {
        if (this.hasReachedConfigurableCapacity() && log.isWarnEnabled()) {
            log.warn((Object)String.format("ConsumerNotificationDispatcher queue (size=%s) low space warning.", this._dispatcherQ.size()));
        }
        this._dispatcherQ.put(notif);
    }

    public boolean isFull() {
        return this._dispatcherQ.remainingCapacity() == 0;
    }

    public boolean hasReachedConfigurableCapacity() {
        return this._dispatcherQ.remainingCapacity() - consumerDispatcherQueueSizeOverhead <= 0;
    }

    public int getSize() {
        return this._dispatcherQ.size();
    }

    public int getRemainingCapacity() {
        return this._dispatcherQ.remainingCapacity();
    }

    public int getRemainingConfigurableCapacity() {
        return Math.max(this._dispatcherQ.remainingCapacity() - consumerDispatcherQueueSizeOverhead, 0);
    }

    public synchronized void purgeNotifications(XMLMessageListener listener, List<Notification> removedNotifs) {
        if (listener == null) {
            return;
        }
        Iterator<Notification> it = this._dispatcherQ.iterator();
        Notification notif = null;
        while (it.hasNext()) {
            notif = it.next();
            if (!notif.usesListener(listener)) continue;
            it.remove();
            removedNotifs.add(notif);
        }
    }

    private void eventLoop() throws InterruptedException {
        try {
            while (true) {
                try {
                    Notification notif;
                    int ret;
                    while (((ret = (notif = this._dispatcherQ.take()).handleNotification()) & 1) == 0) {
                    }
                }
                catch (Throwable t) {
                    log.warn((Object)"Exception occurred in message consumer notification handler", t);
                    continue;
                }
                break;
            }
        }
        catch (Throwable t) {
            log.error((Object)"Unexpected exception occurred in message consumer notification handler", t);
        }
    }

    @Override
    public void run() {
        block4: {
            if (log.isDebugEnabled()) {
                log.debug((Object)"Consumer dispatcher thread starts");
            }
            try {
                this.eventLoop();
            }
            catch (InterruptedException e) {
                if (!log.isDebugEnabled()) break block4;
                log.debug((Object)(Thread.currentThread().getName() + " is interrupted"));
            }
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Consumer dispatcher thread [%s] exits", this.serviceThread.getName()));
        }
    }

    static {
        try {
            consumerDispatcherQueueSizeOverhead = Integer.parseInt(System.getProperty(Solace_ConsumerDispatcherQueueSizeOverhead, "100"));
        }
        catch (Throwable t) {
            consumerDispatcherQueueSizeOverhead = 100;
        }
    }
}

