/*
 * Decompiled with CFR 0.152.
 */
package com.basho.riak.client.core;

import com.basho.riak.client.core.DefaultNodeManager;
import com.basho.riak.client.core.FutureOperation;
import com.basho.riak.client.core.NoNodesAvailableException;
import com.basho.riak.client.core.NodeManager;
import com.basho.riak.client.core.NodeStateListener;
import com.basho.riak.client.core.OperationRetrier;
import com.basho.riak.client.core.RiakFuture;
import com.basho.riak.client.core.RiakNode;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RiakCluster
implements OperationRetrier,
NodeStateListener {
    private final Logger logger = LoggerFactory.getLogger(RiakCluster.class);
    private final int executionAttempts;
    private final int operationQueueMaxDepth;
    private final NodeManager nodeManager;
    private final AtomicInteger inFlightCount = new AtomicInteger();
    private final ScheduledExecutorService executor;
    private final Bootstrap bootstrap;
    private final List<RiakNode> nodeList;
    private final ReentrantReadWriteLock nodeListLock = new ReentrantReadWriteLock();
    private final LinkedBlockingQueue<FutureOperation> retryQueue = new LinkedBlockingQueue();
    private final boolean queueOperations;
    private final LinkedBlockingDeque<FutureOperation> operationQueue;
    private final List<NodeStateListener> stateListeners = Collections.synchronizedList(new LinkedList());
    private volatile ScheduledFuture<?> shutdownFuture;
    private volatile ScheduledFuture<?> retrierFuture;
    private volatile ScheduledFuture<?> queueDrainFuture;
    private volatile State state;
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);

    private RiakCluster(Builder builder) {
        this.executionAttempts = builder.executionAttempts;
        this.queueOperations = builder.operationQueueMaxDepth > 0;
        this.nodeManager = null == builder.nodeManager ? new DefaultNodeManager() : builder.nodeManager;
        this.bootstrap = builder.bootstrap != null ? builder.bootstrap.clone() : (Bootstrap)((Bootstrap)new Bootstrap().group((EventLoopGroup)new NioEventLoopGroup())).channel(NioSocketChannel.class);
        if (builder.executor != null) {
            this.executor = builder.executor;
        } else {
            Integer poolSize = this.queueOperations ? 3 : 2;
            this.executor = new ScheduledThreadPoolExecutor(poolSize);
        }
        this.nodeList = new ArrayList<RiakNode>(builder.riakNodes.size());
        for (RiakNode node : builder.riakNodes) {
            node.setExecutor(this.executor);
            node.setBootstrap(this.bootstrap);
            node.addStateListener(this.nodeManager);
            this.nodeList.add(node);
        }
        if (this.queueOperations) {
            this.operationQueueMaxDepth = builder.operationQueueMaxDepth;
            this.operationQueue = new LinkedBlockingDeque();
            for (RiakNode node : this.nodeList) {
                node.setBlockOnMaxConnections(false);
            }
        } else {
            this.operationQueueMaxDepth = 0;
            this.operationQueue = null;
        }
        this.nodeManager.init(new ArrayList<RiakNode>(this.nodeList));
        this.state = State.CREATED;
    }

    private void stateCheck(State ... allowedStates) {
        if (Arrays.binarySearch((Object[])allowedStates, (Object)this.state) < 0) {
            this.logger.debug("IllegalStateException; required: {} current: {} ", (Object)Arrays.toString((Object[])allowedStates), (Object)this.state);
            throw new IllegalStateException("required: " + Arrays.toString((Object[])allowedStates) + " current: " + (Object)((Object)this.state));
        }
    }

    public synchronized void start() {
        this.stateCheck(State.CREATED);
        for (RiakNode node : this.getNodes()) {
            try {
                node.start();
            }
            catch (UnknownHostException e) {
                this.logger.error("RiakCluster::start - Failed starting node: {}", (Object)e.getMessage());
            }
        }
        this.retrierFuture = this.executor.schedule(new RetryTask(), 0L, TimeUnit.SECONDS);
        if (this.queueOperations) {
            this.queueDrainFuture = this.executor.schedule(new QueueDrainTask(), 0L, TimeUnit.SECONDS);
        }
        this.logger.info("RiakCluster is starting.");
        this.state = State.RUNNING;
    }

    public synchronized Future<Boolean> shutdown() {
        this.stateCheck(State.RUNNING, State.QUEUING);
        this.logger.info("RiakCluster is shutting down.");
        this.state = State.SHUTTING_DOWN;
        this.shutdownFuture = this.executor.scheduleWithFixedDelay(new ShutdownTask(), 500L, 500L, TimeUnit.MILLISECONDS);
        return new Future<Boolean>(){

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return false;
            }

            @Override
            public Boolean get() throws InterruptedException {
                RiakCluster.this.shutdownLatch.await();
                return true;
            }

            @Override
            public Boolean get(long timeout, TimeUnit unit) throws InterruptedException {
                return RiakCluster.this.shutdownLatch.await(timeout, unit);
            }

            @Override
            public boolean isCancelled() {
                return false;
            }

            @Override
            public boolean isDone() {
                return RiakCluster.this.shutdownLatch.getCount() <= 0L;
            }
        };
    }

    public <V, S> RiakFuture<V, S> execute(FutureOperation<V, ?, S> operation) {
        this.stateCheck(State.RUNNING, State.QUEUING);
        operation.setRetrier(this, this.executionAttempts);
        this.inFlightCount.incrementAndGet();
        boolean gotConnection = false;
        if (this.notQueuingOrQueueIsEmpty()) {
            gotConnection = this.execute(operation, null);
        }
        if (!gotConnection) {
            if (this.queueOperations) {
                this.executeWithQueueStrategy(operation, null);
            } else {
                operation.setException(new NoNodesAvailableException());
            }
        }
        return operation;
    }

    private boolean notQueuingOrQueueIsEmpty() {
        return !this.queueOperations || this.operationQueue.size() == 0;
    }

    private void executeWithQueueStrategy(FutureOperation operation, RiakNode previousNode) {
        if (this.operationQueue.size() >= this.operationQueueMaxDepth) {
            this.logger.warn("No Nodes Available, and Operation Queue at Max Depth");
            operation.setRetrier(this, 1);
            operation.setException(new NoNodesAvailableException("No Nodes Available, and Operation Queue at Max Depth"));
            return;
        }
        this.operationQueue.offer(operation);
        this.verifyQueueStatus();
        FutureOperation operationNext = this.operationQueue.poll();
        if (operationNext == null) {
            return;
        }
        this.executeWithRequeueOnNoConnection(operationNext);
    }

    private boolean executeWithRequeueOnNoConnection(FutureOperation operation) {
        boolean gotConnection = this.execute(operation, null);
        if (!gotConnection) {
            this.operationQueue.offerFirst(operation);
        }
        this.verifyQueueStatus();
        return gotConnection;
    }

    private void verifyQueueStatus() {
        Integer queueSize = this.operationQueue.size();
        if (queueSize > 0 && this.state == State.RUNNING) {
            this.state = State.QUEUING;
            this.logger.debug("RiakCluster queuing operations.");
        } else if (queueSize == 0 && (this.state == State.QUEUING || this.state == State.SHUTTING_DOWN)) {
            this.logger.debug("RiakCluster cleared operation queue.");
            if (this.state == State.QUEUING) {
                this.state = State.RUNNING;
            }
        }
    }

    private boolean execute(FutureOperation operation, RiakNode previousNode) {
        return this.nodeManager.executeOnNode(operation, previousNode);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addNode(RiakNode node) throws UnknownHostException {
        this.stateCheck(State.CREATED, State.RUNNING, State.QUEUING);
        node.setExecutor(this.executor);
        node.setBootstrap(this.bootstrap);
        try {
            this.nodeListLock.writeLock().lock();
            this.nodeList.add(node);
            for (NodeStateListener listener : this.stateListeners) {
                node.addStateListener(listener);
            }
        }
        finally {
            this.nodeListLock.writeLock().unlock();
        }
        node.start();
        this.nodeManager.addNode(node);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean removeNode(RiakNode node) {
        this.stateCheck(State.CREATED, State.RUNNING, State.QUEUING);
        boolean removed = false;
        try {
            this.nodeListLock.writeLock().lock();
            removed = this.nodeList.remove(node);
            for (NodeStateListener listener : this.stateListeners) {
                node.removeStateListener(listener);
            }
        }
        finally {
            this.nodeListLock.writeLock().unlock();
        }
        this.nodeManager.removeNode(node);
        return removed;
    }

    public List<RiakNode> getNodes() {
        this.stateCheck(State.CREATED, State.RUNNING, State.SHUTTING_DOWN, State.QUEUING);
        try {
            this.nodeListLock.readLock().lock();
            ArrayList<RiakNode> arrayList = new ArrayList<RiakNode>(this.nodeList);
            return arrayList;
        }
        finally {
            this.nodeListLock.readLock().unlock();
        }
    }

    int inFlightCount() {
        return this.inFlightCount.get();
    }

    @Override
    public void nodeStateChanged(RiakNode node, RiakNode.State state) {
        if (state == RiakNode.State.SHUTDOWN) {
            this.logger.debug("Node state changed to shutdown; {}:{}", (Object)node.getRemoteAddress(), (Object)node.getPort());
            try {
                this.nodeListLock.writeLock().lock();
                this.nodeList.remove(node);
                this.logger.debug("Active nodes remaining: {}", (Object)this.nodeList.size());
                if (this.nodeList.isEmpty()) {
                    this.state = State.SHUTDOWN;
                    this.executor.shutdown();
                    this.bootstrap.group().shutdownGracefully();
                    this.logger.debug("RiakCluster shut down bootstrap");
                    this.logger.info("RiakCluster has shut down");
                    this.shutdownLatch.countDown();
                }
            }
            finally {
                this.nodeListLock.writeLock().unlock();
            }
        }
    }

    @Override
    public void operationFailed(FutureOperation operation, int remainingRetries) {
        this.logger.debug("operation failed; remaining retries: {}", (Object)remainingRetries);
        if (remainingRetries > 0) {
            this.retryQueue.add(operation);
        } else {
            this.inFlightCount.decrementAndGet();
        }
    }

    @Override
    public void operationComplete(FutureOperation operation, int remainingRetries) {
        this.inFlightCount.decrementAndGet();
        this.logger.debug("operation complete; remaining retries: {}", (Object)remainingRetries);
    }

    private void retryOperation() throws InterruptedException {
        FutureOperation operation = this.retryQueue.take();
        Boolean gotConnection = this.execute(operation, operation.getLastNode());
        if (!gotConnection.booleanValue()) {
            operation.setException(new NoNodesAvailableException());
        }
    }

    private void queueDrainOperation() throws InterruptedException {
        FutureOperation operation = this.operationQueue.take();
        boolean connectionSuccess = this.executeWithRequeueOnNoConnection(operation);
        if (!connectionSuccess) {
            Thread.sleep(50L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerNodeStateListener(NodeStateListener listener) {
        this.stateCheck(State.CREATED, State.RUNNING, State.SHUTTING_DOWN, State.QUEUING);
        try {
            this.stateListeners.add(listener);
            this.nodeListLock.readLock().lock();
            for (RiakNode node : this.nodeList) {
                node.addStateListener(listener);
                listener.nodeStateChanged(node, node.getNodeState());
            }
        }
        finally {
            this.nodeListLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeNodeStateListener(NodeStateListener listener) {
        this.stateCheck(State.CREATED, State.RUNNING, State.SHUTTING_DOWN, State.QUEUING);
        try {
            this.stateListeners.remove(listener);
            this.nodeListLock.readLock().lock();
            for (RiakNode node : this.nodeList) {
                node.removeStateListener(listener);
            }
        }
        finally {
            this.nodeListLock.readLock().unlock();
        }
    }

    public static Builder builder(List<RiakNode> nodes) {
        return new Builder(nodes);
    }

    public static Builder builder(RiakNode node) {
        return new Builder(node);
    }

    public static class Builder {
        public static final int DEFAULT_EXECUTION_ATTEMPTS = 3;
        public static final int DEFAULT_OPERATION_QUEUE_DEPTH = 0;
        private final List<RiakNode> riakNodes;
        private int executionAttempts = 3;
        private int operationQueueMaxDepth = 0;
        private NodeManager nodeManager;
        private ScheduledExecutorService executor;
        private Bootstrap bootstrap;

        public Builder(List<RiakNode> riakNodes) {
            this.riakNodes = new ArrayList<RiakNode>(riakNodes);
        }

        public Builder(RiakNode.Builder nodeBuilder, List<String> remoteAddresses) throws UnknownHostException {
            this.riakNodes = RiakNode.Builder.buildNodes(nodeBuilder, remoteAddresses);
        }

        public Builder(RiakNode.Builder nodeBuilder, String ... remoteAddresses) throws UnknownHostException {
            this.riakNodes = RiakNode.Builder.buildNodes(nodeBuilder, remoteAddresses);
        }

        public Builder(RiakNode node) {
            this.riakNodes = new ArrayList<RiakNode>(1);
            this.riakNodes.add(node);
        }

        public Builder withExecutionAttempts(int numberOfAttempts) {
            this.executionAttempts = numberOfAttempts;
            return this;
        }

        public Builder withNodeManager(NodeManager nodeManager) {
            this.nodeManager = nodeManager;
            return this;
        }

        public Builder withExecutor(ScheduledExecutorService executor) {
            this.executor = executor;
            return this;
        }

        public Builder withBootstrap(Bootstrap bootstrap) {
            this.bootstrap = bootstrap;
            return this;
        }

        public Builder withOperationQueueMaxDepth(int operationQueueMaxDepth) {
            this.operationQueueMaxDepth = operationQueueMaxDepth;
            return this;
        }

        public RiakCluster build() {
            return new RiakCluster(this);
        }
    }

    private class ShutdownTask
    implements Runnable {
        private ShutdownTask() {
        }

        @Override
        public void run() {
            if (RiakCluster.this.inFlightCount.get() == 0) {
                RiakCluster.this.logger.info("All operations have completed");
                RiakCluster.this.retrierFuture.cancel(true);
                if (RiakCluster.this.queueOperations) {
                    RiakCluster.this.queueDrainFuture.cancel(true);
                }
                for (RiakNode node : RiakCluster.this.getNodes()) {
                    node.addStateListener(RiakCluster.this);
                    RiakCluster.this.logger.debug("calling shutdown on node {}:{}", (Object)node.getRemoteAddress(), (Object)node.getPort());
                    node.shutdown();
                }
                RiakCluster.this.shutdownFuture.cancel(false);
            }
        }
    }

    private class QueueDrainTask
    implements Runnable {
        private QueueDrainTask() {
        }

        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    RiakCluster.this.queueDrainOperation();
                }
                catch (InterruptedException ex) {
                    // empty catch block
                    break;
                }
            }
            RiakCluster.this.logger.info("Queue Worker shutting down.");
        }
    }

    private class RetryTask
    implements Runnable {
        private RetryTask() {
        }

        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    RiakCluster.this.retryOperation();
                }
                catch (InterruptedException ex) {
                    // empty catch block
                    break;
                }
            }
            RiakCluster.this.logger.info("Retrier shutting down.");
        }
    }

    static enum State {
        CREATED,
        RUNNING,
        QUEUING,
        SHUTTING_DOWN,
        SHUTDOWN;

    }
}

