/*
 * Decompiled with CFR 0.152.
 */
package com.tc.async.impl;

import com.tc.async.api.EventHandler;
import com.tc.async.api.EventHandlerException;
import com.tc.async.api.MultiThreadedEventContext;
import com.tc.async.api.Sink;
import com.tc.async.api.Source;
import com.tc.async.api.SpecializedEventContext;
import com.tc.async.api.StageQueueStats;
import com.tc.async.impl.AbstractStageQueueImpl;
import com.tc.async.impl.ContextWrapper;
import com.tc.async.impl.StageQueue;
import com.tc.exception.TCRuntimeException;
import com.tc.logging.TCLogger;
import com.tc.logging.TCLoggerProvider;
import com.tc.stats.Stats;
import com.tc.util.Assert;
import com.tc.util.concurrent.QueueFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class MultiStageQueueImpl<EC>
implements StageQueue<EC> {
    static final String FINDSTRATEGY_PROPNAME = "tc.stagequeueimpl.findstrategy";
    private static final ShortestFindStrategy SHORTEST_FIND_STRATEGY;
    private final boolean moduloAnd;
    private final int moduleMask;
    private final int PARTITION_SHIFT;
    private final String stageName;
    private final ShortestFindStrategy myShortestFindStrategy;
    private final TCLogger logger;
    private final MultiSourceQueueImpl<ContextWrapper<EC>>[] sourceQueues;
    private volatile boolean closed = false;
    private volatile int fcheck = 0;
    private AtomicInteger partitionHand = new AtomicInteger(0);

    MultiStageQueueImpl(int queueCount, QueueFactory<ContextWrapper<EC>> queueFactory, TCLoggerProvider loggerProvider, String stageName, int queueSize) {
        Assert.eval(queueCount > 0);
        this.myShortestFindStrategy = SHORTEST_FIND_STRATEGY;
        this.PARTITION_SHIFT = queueCount >= 8 ? 2 : 1;
        this.logger = loggerProvider.getLogger(Sink.class.getName() + ": " + stageName);
        this.stageName = stageName;
        this.sourceQueues = new MultiSourceQueueImpl[queueCount];
        this.createWorkerQueues(queueCount, queueFactory, queueSize, stageName);
        int sz = this.sourceQueues.length;
        if (Integer.bitCount(sz) == 1) {
            this.moduloAnd = true;
            this.moduleMask = sz - 1;
        } else {
            this.moduloAnd = false;
            this.moduleMask = 0;
        }
    }

    private static ShortestFindStrategy chooseStrategy(ShortestFindStrategy defaultVal) {
        String stratName = System.getProperty(FINDSTRATEGY_PROPNAME, defaultVal.name());
        for (ShortestFindStrategy s : ShortestFindStrategy.values()) {
            if (!s.name().toUpperCase().equals(stratName.toUpperCase())) continue;
            return s;
        }
        System.err.println("Unrecognized 'tc.stagequeueimpl.findstrategy' value: " + stratName + "; using: " + (Object)((Object)defaultVal));
        return defaultVal;
    }

    private int moduloQueueCount(int i) {
        if (this.moduloAnd) {
            return i & this.moduleMask;
        }
        return i % this.sourceQueues.length;
    }

    private void createWorkerQueues(int queueCount, QueueFactory<ContextWrapper<EC>> queueFactory, int queueSize, String stage) {
        AbstractStageQueueImpl.NullStageQueueStatsCollector statsCollector = new AbstractStageQueueImpl.NullStageQueueStatsCollector(stage);
        BlockingQueue<ContextWrapper<EC>> q = null;
        if (queueSize != Integer.MAX_VALUE) {
            queueSize = (int)Math.ceil((double)queueSize / (double)queueCount);
        }
        Assert.eval(queueSize > 0);
        for (int i = 0; i < queueCount; ++i) {
            q = queueFactory.createInstance(queueSize);
            this.sourceQueues[i] = new MultiSourceQueueImpl<ContextWrapper<EC>>(q, i, statsCollector);
        }
    }

    @Override
    public Source<ContextWrapper<EC>> getSource(int index) {
        return index < 0 || index >= this.sourceQueues.length ? null : this.sourceQueues[index];
    }

    @Override
    public void setClosed(boolean closed) {
        this.closed = closed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addSingleThreaded(EC context) {
        Assert.assertNotNull(context);
        Assert.assertFalse(context instanceof MultiThreadedEventContext);
        if (this.closed) {
            throw new IllegalStateException("closed");
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Added:" + context + " to:" + this.stageName);
        }
        boolean interrupted = Thread.interrupted();
        AbstractStageQueueImpl.HandledContext<EC> wrapper = new AbstractStageQueueImpl.HandledContext<EC>(context);
        try {
            while (true) {
                try {
                    this.sourceQueues[0].put(wrapper);
                }
                catch (InterruptedException e) {
                    this.logger.debug("StageQueue Add: " + e);
                    interrupted = true;
                    continue;
                }
                break;
            }
        }
        finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addMultiThreaded(EC context) {
        Assert.assertNotNull(context);
        Assert.assertTrue(context instanceof MultiThreadedEventContext);
        if (this.closed) {
            throw new IllegalStateException("closed");
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Added:" + context + " to:" + this.stageName);
        }
        boolean interrupted = Thread.interrupted();
        MultiThreadedEventContext cxt = (MultiThreadedEventContext)context;
        int index = this.getSourceQueueFor(cxt);
        ContextWrapper<Object> wrapper = cxt.flush() ? new FlushingHandledContext(context, index) : new AbstractStageQueueImpl.HandledContext<EC>(context);
        try {
            while (true) {
                try {
                    this.sourceQueues[index].put(wrapper);
                }
                catch (InterruptedException e) {
                    this.logger.debug("StageQueue Add: " + e);
                    interrupted = true;
                    continue;
                }
                break;
            }
        }
        finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addSpecialized(SpecializedEventContext specialized) {
        if (this.closed) {
            throw new IllegalStateException("closed");
        }
        AbstractStageQueueImpl.DirectExecuteContext wrapper = new AbstractStageQueueImpl.DirectExecuteContext(specialized);
        boolean interrupted = Thread.interrupted();
        int index = this.getSourceQueueFor(specialized);
        try {
            while (true) {
                try {
                    this.sourceQueues[index].put(wrapper);
                }
                catch (InterruptedException e) {
                    this.logger.debug("StageQueue Add: " + e);
                    interrupted = true;
                    continue;
                }
                break;
            }
        }
        finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private int findShortestQueueIndex() {
        switch (SHORTEST_FIND_STRATEGY) {
            case PARTITION: {
                int offset = this.moduloQueueCount(this.partitionHand.getAndIncrement() << this.PARTITION_SHIFT);
                int min = Integer.MAX_VALUE;
                int can = -1;
                for (int i = 0; i < 1 << this.PARTITION_SHIFT; ++i) {
                    int checkMin = this.sourceQueues[offset].size();
                    if (checkMin < min) {
                        can = offset;
                        min = checkMin;
                    }
                    offset = this.moduloQueueCount(offset + 1);
                }
                return can;
            }
            case BRUTE: {
                int pointer = this.fcheck;
                int min = Integer.MAX_VALUE;
                int can = -1;
                for (int x = 0; x < this.sourceQueues.length; ++x) {
                    int index = this.moduloQueueCount(pointer + x);
                    MultiSourceQueueImpl<ContextWrapper<EC>> impl = this.sourceQueues[index];
                    if (impl.isEmpty()) {
                        return index;
                    }
                    int checkMin = impl.size();
                    if (Math.min(min, checkMin) == min) continue;
                    can = index;
                    min = checkMin;
                }
                Assert.assertTrue(can >= 0 && can < this.sourceQueues.length);
                return can;
            }
        }
        throw new IllegalStateException();
    }

    private int getSourceQueueFor(MultiThreadedEventContext context) {
        Object schedulingKey = context.getSchedulingKey();
        if (null == schedulingKey) {
            return this.findShortestQueueIndex();
        }
        int index = this.hashCodeToArrayIndex(schedulingKey.hashCode(), this.sourceQueues.length);
        return index;
    }

    private int hashCodeToArrayIndex(int hashcode, int arrayLength) {
        return Math.abs(hashcode % arrayLength);
    }

    @Override
    public int size() {
        int totalQueueSize = 0;
        for (MultiSourceQueueImpl<ContextWrapper<EC>> sourceQueue : this.sourceQueues) {
            totalQueueSize += sourceQueue.size();
        }
        return totalQueueSize;
    }

    @Override
    public String toString() {
        return "StageQueue(" + this.stageName + ")";
    }

    @Override
    public void clear() {
        int clearCount = 0;
        for (MultiSourceQueueImpl<ContextWrapper<EC>> sourceQueue : this.sourceQueues) {
            clearCount += sourceQueue.clear();
        }
        this.logger.info("Cleared " + clearCount);
    }

    @Override
    public void enableStatsCollection(boolean enable) {
        StageQueueStats collector = null;
        for (MultiSourceQueueImpl<ContextWrapper<EC>> src : this.sourceQueues) {
            String name = this.stageName + "[" + src.getSourceName() + "]";
            if (collector == null || !collector.getName().equals(name)) {
                collector = enable ? new AbstractStageQueueImpl.StageQueueStatsCollectorImpl(name) : new AbstractStageQueueImpl.NullStageQueueStatsCollector(name);
            }
            src.setStatsCollector((AbstractStageQueueImpl.StageQueueStatsCollector)collector);
        }
    }

    @Override
    public Stats getStats(long frequency) {
        if (this.sourceQueues.length == 1) {
            return this.sourceQueues[0].getStatsCollector();
        }
        return new Stats(){

            @Override
            public String getDetails() {
                StringBuilder build = new StringBuilder();
                AbstractStageQueueImpl.StageQueueStatsCollector stats = null;
                for (MultiSourceQueueImpl impl : MultiStageQueueImpl.this.sourceQueues) {
                    AbstractStageQueueImpl.StageQueueStatsCollector current = impl.getStatsCollector();
                    if (stats != current) {
                        if (stats != null) {
                            build.append('\n');
                        }
                        build.append(current.getDetails());
                    }
                    stats = current;
                }
                return build.toString();
            }

            @Override
            public void logDetails(TCLogger statsLogger) {
                statsLogger.info(this.getDetails());
            }
        };
    }

    @Override
    public Stats getStatsAndReset(long frequency) {
        return this.getStats(frequency);
    }

    @Override
    public boolean isStatsCollectionEnabled() {
        return this.sourceQueues[0].getStatsCollector() instanceof AbstractStageQueueImpl.StageQueueStatsCollectorImpl;
    }

    @Override
    public void resetStats() {
        this.sourceQueues[0].getStatsCollector().reset();
    }

    static {
        ShortestFindStrategy strat = ShortestFindStrategy.PARTITION;
        try {
            strat = MultiStageQueueImpl.chooseStrategy(ShortestFindStrategy.PARTITION);
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        SHORTEST_FIND_STRATEGY = strat;
    }

    private class FlushingHandledContext<T extends EC>
    implements ContextWrapper<EC> {
        private final EC context;
        private final int offset;
        private int executionCount = 0;

        public FlushingHandledContext(EC context, int offset) {
            this.context = context;
            this.offset = offset;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void runWithHandler(EventHandler<EC> handler) throws EventHandlerException {
            if (++this.executionCount == MultiStageQueueImpl.this.sourceQueues.length) {
                handler.handleEvent(this.context);
            } else {
                boolean interrupted = false;
                try {
                    while (true) {
                        try {
                            MultiStageQueueImpl.this.sourceQueues[MultiStageQueueImpl.this.moduloQueueCount(this.executionCount + this.offset)].put(this);
                        }
                        catch (InterruptedException e) {
                            MultiStageQueueImpl.this.logger.debug("FlushingHandledContext move to next queue: " + e + " : " + (this.executionCount + this.offset) % MultiStageQueueImpl.this.sourceQueues.length);
                            interrupted = true;
                            continue;
                        }
                        break;
                    }
                }
                finally {
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }

        public boolean equals(Object obj) {
            if (this.context.getClass().isInstance(obj)) {
                return this.context.equals(obj);
            }
            return super.equals(obj);
        }
    }

    private final class MultiSourceQueueImpl<W>
    implements AbstractStageQueueImpl.SourceQueue<W> {
        private final BlockingQueue<W> queue;
        private final int sourceIndex;
        private volatile AbstractStageQueueImpl.StageQueueStatsCollector statsCollector;

        public MultiSourceQueueImpl(BlockingQueue<W> queue, int sourceIndex, AbstractStageQueueImpl.StageQueueStatsCollector statsCollector) {
            this.queue = queue;
            this.sourceIndex = sourceIndex;
            this.statsCollector = statsCollector;
        }

        public String toString() {
            return "SourceQueueImpl{" + this.sourceIndex + "size=" + this.queue.size() + '}';
        }

        @Override
        public AbstractStageQueueImpl.StageQueueStatsCollector getStatsCollector() {
            return this.statsCollector;
        }

        @Override
        public void setStatsCollector(AbstractStageQueueImpl.StageQueueStatsCollector collector) {
            this.statsCollector = collector;
        }

        @Override
        public int clear() {
            int cleared = 0;
            try {
                while (this.poll(0L) != null) {
                    ++cleared;
                }
                return cleared;
            }
            catch (InterruptedException e) {
                throw new TCRuntimeException(e);
            }
        }

        @Override
        public boolean isEmpty() {
            return this.queue.isEmpty();
        }

        @Override
        public W poll(long timeout) throws InterruptedException {
            W rv = this.queue.poll(timeout, TimeUnit.MILLISECONDS);
            if (rv != null) {
                this.statsCollector.contextRemoved();
                if (this.queue.isEmpty()) {
                    MultiStageQueueImpl.this.fcheck = this.sourceIndex;
                }
            } else {
                MultiStageQueueImpl.this.fcheck = this.sourceIndex;
            }
            return rv;
        }

        @Override
        public void put(W context) throws InterruptedException {
            this.queue.put(context);
            this.statsCollector.contextAdded();
        }

        @Override
        public int size() {
            return this.queue.size();
        }

        @Override
        public String getSourceName() {
            return Integer.toString(this.sourceIndex);
        }
    }

    static enum ShortestFindStrategy {
        BRUTE,
        PARTITION;

    }
}

