/*
 * Decompiled with CFR 0.152.
 */
package quickfix.mina;

import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.mina.core.session.IoSession;
import quickfix.Responder;
import quickfix.Session;
import quickfix.mina.IoSessionResponder;
import quickfix.mina.QueueTracker;
import quickfix.mina.WatermarkTracker;

final class QueueTrackers {
    private static final String LOWER_WATERMARK_FMT = "inbound queue size < lower watermark (%d), socket reads resumed";
    private static final String UPPER_WATERMARK_FMT = "inbound queue size > upper watermark (%d), socket reads suspended";

    QueueTrackers() {
    }

    static <E> WatermarkTracker<E, Session> newMultiSessionWatermarkTracker(BlockingQueue<E> queue, long lowerWatermark, long upperWatermark, Function<E, Session> classifier) {
        return WatermarkTracker.newMulti(queue, lowerWatermark, upperWatermark, classifier, qfSession -> QueueTrackers.resumeReads(qfSession, (int)lowerWatermark), qfSession -> QueueTrackers.suspendReads(qfSession, (int)upperWatermark));
    }

    static <E> QueueTracker<E> newDefaultQueueTracker(final BlockingQueue<E> queue) {
        return new QueueTracker<E>(){

            @Override
            public void put(E e) throws InterruptedException {
                queue.put(e);
            }

            @Override
            public E poll(long timeout, TimeUnit unit) throws InterruptedException {
                return queue.poll(timeout, unit);
            }

            @Override
            public int drainTo(Collection<E> collection) {
                return queue.drainTo(collection);
            }
        };
    }

    private static IoSession lookupIoSession(Session qfSession) {
        Responder responder = qfSession.getResponder();
        if (responder instanceof IoSessionResponder) {
            return ((IoSessionResponder)responder).getIoSession();
        }
        return null;
    }

    private static void resumeReads(Session qfSession, int queueLowerWatermark) {
        IoSession ioSession = QueueTrackers.lookupIoSession(qfSession);
        if (ioSession != null && ioSession.isReadSuspended()) {
            ioSession.resumeRead();
            qfSession.getLog().onEvent(String.format(LOWER_WATERMARK_FMT, queueLowerWatermark));
        }
    }

    private static void suspendReads(Session qfSession, int queueUpperWatermark) {
        IoSession ioSession = QueueTrackers.lookupIoSession(qfSession);
        if (ioSession != null && !ioSession.isReadSuspended()) {
            ioSession.suspendRead();
            qfSession.getLog().onEvent(String.format(UPPER_WATERMARK_FMT, queueUpperWatermark));
        }
    }

    static <E, Void> WatermarkTracker<E, Void> newSingleSessionWatermarkTracker(BlockingQueue<E> queue, long lowerWatermark, long upperWatermark, Session qfSession) {
        return WatermarkTracker.newMono(queue, lowerWatermark, upperWatermark, () -> QueueTrackers.resumeReads(qfSession, (int)lowerWatermark), () -> QueueTrackers.suspendReads(qfSession, (int)upperWatermark));
    }
}

