/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.spi.impl;

import com.hazelcast.instance.Node;
import com.hazelcast.instance.OutOfMemoryErrorDispatcher;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.ExecutionService;
import com.hazelcast.spi.impl.BasicOperationProcessor;
import com.hazelcast.util.executor.AbstractExecutorThreadFactory;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public final class BasicOperationScheduler {
    private final ILogger logger;
    private final Node node;
    private final Executor globalExecutor;
    private final ConcurrentLinkedQueue globalExecutorPriorityQueue;
    private final int operationThreadCount;
    private final BasicOperationProcessor processor;
    private final PartitionThread[] partitionThreads;
    private final Runnable triggerTask = new Runnable(){

        @Override
        public void run() {
        }
    };

    public BasicOperationScheduler(Node node, ExecutionService executionService, int operationThreadCount, BasicOperationProcessor processor) {
        this.logger = node.getLogger(BasicOperationScheduler.class);
        this.node = node;
        this.processor = processor;
        this.operationThreadCount = operationThreadCount;
        this.partitionThreads = new PartitionThread[operationThreadCount];
        for (int operationThreadId = 0; operationThreadId < operationThreadCount; ++operationThreadId) {
            PartitionThread partitionThread;
            this.partitionThreads[operationThreadId] = partitionThread = this.createPartitionThread(operationThreadId);
            partitionThread.start();
        }
        int coreSize = Runtime.getRuntime().availableProcessors();
        this.globalExecutorPriorityQueue = new ConcurrentLinkedQueue();
        this.globalExecutor = executionService.register("hz:operation", coreSize * 2, coreSize * 100000);
    }

    private PartitionThread createPartitionThread(int operationThreadId) {
        PartitionThreadFactory threadFactory = new PartitionThreadFactory(operationThreadId);
        return threadFactory.createThread(null);
    }

    boolean isAllowedToRunInCurrentThread(int partitionId) {
        if (partitionId > -1) {
            Thread currentThread = Thread.currentThread();
            if (currentThread instanceof PartitionThread) {
                int threadId = ((PartitionThread)currentThread).threadId;
                return this.toPartitionThreadIndex(partitionId) == threadId;
            }
            return false;
        }
        return true;
    }

    boolean isInvocationAllowedFromCurrentThread(int partitionId) {
        Thread currentThread = Thread.currentThread();
        if (currentThread instanceof PartitionThread) {
            if (partitionId > -1) {
                int threadId = ((PartitionThread)currentThread).threadId;
                return this.toPartitionThreadIndex(partitionId) == threadId;
            }
            return true;
        }
        return true;
    }

    public int getOperationExecutorQueueSize() {
        int size = 0;
        for (PartitionThread t : this.partitionThreads) {
            size += t.workQueue.size();
            size += t.priorityQueue.size();
        }
        return size;
    }

    public void execute(Object task, int partitionId, boolean priority) {
        if (task == null) {
            throw new NullPointerException();
        }
        if (partitionId > -1) {
            PartitionThread partitionThread = this.partitionThreads[this.toPartitionThreadIndex(partitionId)];
            if (priority) {
                this.offerWork(partitionThread.priorityQueue, task);
                this.offerWork(partitionThread.workQueue, this.triggerTask);
            } else {
                this.offerWork(partitionThread.workQueue, task);
            }
        } else if (priority) {
            this.offerWork(this.globalExecutorPriorityQueue, task);
            this.globalExecutor.execute(new ProcessTask(null));
        } else {
            this.globalExecutor.execute(new ProcessTask(task));
        }
    }

    private void offerWork(Queue queue, Object task) {
        boolean offer = queue.offer(task);
        if (!offer) {
            this.logger.severe("Failed to offer " + task + " to BasicOperationScheduler due to overload");
        }
    }

    private int toPartitionThreadIndex(int partitionId) {
        return partitionId % this.operationThreadCount;
    }

    public void shutdown() {
        for (PartitionThread thread : this.partitionThreads) {
            thread.shutdown();
        }
        for (PartitionThread thread : this.partitionThreads) {
            try {
                thread.awaitTermination(3, TimeUnit.SECONDS);
            }
            catch (InterruptedException ignored) {
                // empty catch block
            }
        }
    }

    public final class PartitionThread
    extends Thread {
        final int threadId;
        private final BlockingQueue workQueue;
        private final Queue priorityQueue;
        private volatile boolean shutdown;

        public PartitionThread(String name, int threadId) {
            super(((BasicOperationScheduler)BasicOperationScheduler.this).node.threadGroup, name);
            this.workQueue = new LinkedBlockingQueue();
            this.priorityQueue = new ConcurrentLinkedQueue();
            this.threadId = threadId;
        }

        @Override
        public void run() {
            try {
                this.doRun();
            }
            catch (OutOfMemoryError e) {
                OutOfMemoryErrorDispatcher.onOutOfMemory(e);
            }
            catch (Throwable t) {
                BasicOperationScheduler.this.logger.severe(t);
            }
        }

        private void doRun() {
            while (true) {
                Object task;
                try {
                    task = this.workQueue.take();
                }
                catch (InterruptedException e) {
                    if (!this.shutdown) continue;
                    return;
                }
                if (this.shutdown) {
                    return;
                }
                this.processPriorityMessages();
                this.process(task);
            }
        }

        private void process(Object task) {
            try {
                BasicOperationScheduler.this.processor.process(task);
            }
            catch (Exception e) {
                BasicOperationScheduler.this.logger.severe("Failed tp process task: " + task + " on partitionThread:" + this.getName());
            }
        }

        private void processPriorityMessages() {
            Object task;
            while ((task = this.priorityQueue.poll()) != null) {
                this.process(task);
            }
            return;
        }

        private void shutdown() {
            this.shutdown = true;
            this.workQueue.add(new PoisonPill());
        }

        public void awaitTermination(int timeout, TimeUnit unit) throws InterruptedException {
            this.join(unit.toMillis(timeout));
        }

        private class PoisonPill {
            private PoisonPill() {
            }
        }
    }

    private class PartitionThreadFactory
    extends AbstractExecutorThreadFactory {
        private final String threadName;
        private final int threadId;

        public PartitionThreadFactory(int threadId) {
            super(((BasicOperationScheduler)BasicOperationScheduler.this).node.threadGroup, BasicOperationScheduler.this.node.getConfigClassLoader());
            String poolNamePrefix = BasicOperationScheduler.this.node.getThreadPoolNamePrefix("operation");
            this.threadName = poolNamePrefix + threadId;
            this.threadId = threadId;
        }

        @Override
        protected PartitionThread createThread(Runnable r) {
            return new PartitionThread(this.threadName, this.threadId);
        }
    }

    private class ProcessTask
    implements Runnable {
        private final Object task;

        public ProcessTask(Object task) {
            this.task = task;
        }

        @Override
        public void run() {
            try {
                Object task;
                while ((task = BasicOperationScheduler.this.globalExecutorPriorityQueue.poll()) != null) {
                    BasicOperationScheduler.this.processor.process(task);
                }
                if (this.task != null) {
                    BasicOperationScheduler.this.processor.process(this.task);
                }
            }
            catch (Throwable t) {
                OutOfMemoryErrorDispatcher.inspectOutputMemoryError(t);
                BasicOperationScheduler.this.logger.severe(t);
            }
        }
    }
}

