/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.Future;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.net.AcceptVersions;
import org.apache.cassandra.net.ConnectionCategory;
import org.apache.cassandra.net.ConnectionType;
import org.apache.cassandra.net.FutureCombiner;
import org.apache.cassandra.net.InboundConnectionSettings;
import org.apache.cassandra.net.InboundMessageHandlers;
import org.apache.cassandra.net.InboundSink;
import org.apache.cassandra.net.InboundSockets;
import org.apache.cassandra.net.LatencySubscribers;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingServiceMBeanImpl;
import org.apache.cassandra.net.OutboundConnectionSettings;
import org.apache.cassandra.net.OutboundConnections;
import org.apache.cassandra.net.OutboundSink;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.RequestCallbacks;
import org.apache.cassandra.net.ResourceLimits;
import org.apache.cassandra.net.SocketFactory;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MessagingService
extends MessagingServiceMBeanImpl {
    private static final Logger logger = LoggerFactory.getLogger(MessagingService.class);
    public static final int VERSION_30 = 10;
    public static final int VERSION_3014 = 11;
    public static final int VERSION_40 = 12;
    public static final int minimum_version = 10;
    public static final int current_version = 12;
    static AcceptVersions accept_messaging = new AcceptVersions(10, 12);
    static AcceptVersions accept_streaming = new AcceptVersions(12, 12);
    public final SocketFactory socketFactory = new SocketFactory();
    public final LatencySubscribers latencySubscribers = new LatencySubscribers();
    public final RequestCallbacks callbacks = new RequestCallbacks(this);
    public final InboundSink inboundSink = new InboundSink(this);
    private final InboundMessageHandlers.GlobalResourceLimits inboundGlobalReserveLimits = new InboundMessageHandlers.GlobalResourceLimits(new ResourceLimits.Concurrent(DatabaseDescriptor.getInternodeApplicationReceiveQueueReserveGlobalCapacityInBytes()));
    private final InboundSockets inboundSockets = new InboundSockets(new InboundConnectionSettings().withHandlers(this::getInbound).withSocketFactory(this.socketFactory));
    public final OutboundSink outboundSink = new OutboundSink(this::doSend);
    final ResourceLimits.Limit outboundGlobalReserveLimit = new ResourceLimits.Concurrent(DatabaseDescriptor.getInternodeApplicationSendQueueReserveGlobalCapacityInBytes());
    private volatile boolean isShuttingDown;

    public static MessagingService instance() {
        return MSHandle.instance;
    }

    @VisibleForTesting
    MessagingService(boolean testOnly) {
        super(testOnly);
        OutboundConnections.scheduleUnusedConnectionMonitoring(this, ScheduledExecutors.scheduledTasks, 1L, TimeUnit.HOURS);
    }

    public void sendWithCallback(Message message, InetAddressAndPort to, RequestCallback cb) {
        this.sendWithCallback(message, to, cb, null);
    }

    public void sendWithCallback(Message message, InetAddressAndPort to, RequestCallback cb, ConnectionType specifyConnection) {
        this.callbacks.addWithExpiration(cb, message, to);
        if (cb.invokeOnFailure() && !message.callBackOnFailure()) {
            message = message.withCallBackOnFailure();
        }
        this.send(message, to, specifyConnection);
    }

    public void sendWriteWithCallback(Message message, Replica to, AbstractWriteResponseHandler<?> handler, boolean allowHints) {
        assert (message.callBackOnFailure());
        this.callbacks.addWithExpiration(handler, message, to, handler.consistencyLevel(), allowHints);
        this.send(message, to.endpoint(), null);
    }

    public void send(Message message, InetAddressAndPort to) {
        this.send(message, to, null);
    }

    public void send(Message message, InetAddressAndPort to, ConnectionType specifyConnection) {
        if (logger.isTraceEnabled()) {
            logger.trace("{} sending {} to {}@{}", new Object[]{FBUtilities.getBroadcastAddressAndPort(), message.verb(), message.id(), to});
            if (to.equals(FBUtilities.getBroadcastAddressAndPort())) {
                logger.trace("Message-to-self {} going over MessagingService", (Object)message);
            }
        }
        this.outboundSink.accept(message, to, specifyConnection);
    }

    private void doSend(Message message, InetAddressAndPort to, ConnectionType specifyConnection) {
        while (true) {
            OutboundConnections connections = this.getOutbound(to);
            try {
                connections.enqueue(message, specifyConnection);
                return;
            }
            catch (ClosedChannelException e) {
                if (this.isShuttingDown) {
                    return;
                }
                this.channelManagers.remove(to, connections);
                continue;
            }
            break;
        }
    }

    void markExpiredCallback(InetAddressAndPort addr) {
        OutboundConnections conn = (OutboundConnections)this.channelManagers.get(addr);
        if (conn != null) {
            conn.incrementExpiredCallbackCount();
        }
    }

    public void closeOutbound(InetAddressAndPort to) {
        OutboundConnections pool = (OutboundConnections)this.channelManagers.get(to);
        if (pool != null) {
            pool.scheduleClose(5L, TimeUnit.MINUTES, true).addListener(future -> this.channelManagers.remove(to, pool));
        }
    }

    void closeOutboundNow(OutboundConnections connections) {
        connections.close(true).addListener(future -> this.channelManagers.remove(connections.template().to, connections));
    }

    public void removeInbound(InetAddressAndPort from) {
        InboundMessageHandlers handlers = (InboundMessageHandlers)this.messageHandlers.remove(from);
        if (null != handlers) {
            handlers.releaseMetrics();
        }
    }

    public void interruptOutbound(InetAddressAndPort to) {
        OutboundConnections pool = (OutboundConnections)this.channelManagers.get(to);
        if (pool != null) {
            pool.interrupt();
        }
    }

    public Future<Void> maybeReconnectWithNewIp(InetAddressAndPort address, InetAddressAndPort preferredAddress) {
        if (!SystemKeyspace.updatePreferredIP(address, preferredAddress)) {
            return null;
        }
        OutboundConnections messagingPool = (OutboundConnections)this.channelManagers.get(address);
        if (messagingPool != null) {
            return messagingPool.reconnectWithNewIp(preferredAddress);
        }
        return null;
    }

    public void shutdown() {
        this.shutdown(1L, TimeUnit.MINUTES, true, true);
    }

    public void shutdown(long timeout, TimeUnit units, boolean shutdownGracefully, boolean shutdownExecutors) {
        this.isShuttingDown = true;
        logger.info("Waiting for messaging service to quiesce");
        assert (!Stage.MUTATION.executor().isShutdown());
        if (shutdownGracefully) {
            this.callbacks.shutdownGracefully();
            ArrayList<Future<Void>> closing = new ArrayList<Future<Void>>();
            for (OutboundConnections pool : this.channelManagers.values()) {
                closing.add(pool.close(true));
            }
            long deadline = System.nanoTime() + units.toNanos(timeout);
            Throwables.DiscreteAction[] discreteActionArray = new Throwables.DiscreteAction[6];
            discreteActionArray[0] = () -> {
                Void cfr_ignored_0 = (Void)new FutureCombiner(closing).get(timeout, units);
            };
            discreteActionArray[1] = () -> {
                ArrayList inboundExecutors = new ArrayList();
                this.inboundSockets.close(Collections.synchronizedList(inboundExecutors)::add).get();
                ExecutorUtils.awaitTermination(1L, TimeUnit.MINUTES, inboundExecutors);
            };
            discreteActionArray[2] = () -> {
                if (shutdownExecutors) {
                    this.shutdownExecutors(deadline);
                }
            };
            discreteActionArray[3] = () -> this.callbacks.awaitTerminationUntil(deadline);
            discreteActionArray[4] = this.inboundSink::clear;
            discreteActionArray[5] = this.outboundSink::clear;
            Throwables.maybeFail(discreteActionArray);
        } else {
            this.callbacks.shutdownNow(false);
            ArrayList<Future<Void>> closing = new ArrayList<Future<Void>>();
            List inboundExecutors = Collections.synchronizedList(new ArrayList());
            closing.add(this.inboundSockets.close(inboundExecutors::add));
            for (OutboundConnections pool : this.channelManagers.values()) {
                closing.add(pool.close(false));
            }
            long deadline = System.nanoTime() + units.toNanos(timeout);
            Throwables.DiscreteAction[] discreteActionArray = new Throwables.DiscreteAction[6];
            discreteActionArray[0] = () -> {
                Void cfr_ignored_0 = (Void)new FutureCombiner(closing).get(timeout, units);
            };
            discreteActionArray[1] = () -> {
                if (shutdownExecutors) {
                    this.shutdownExecutors(deadline);
                }
            };
            discreteActionArray[2] = () -> ExecutorUtils.awaitTermination(timeout, units, inboundExecutors);
            discreteActionArray[3] = () -> this.callbacks.awaitTerminationUntil(deadline);
            discreteActionArray[4] = this.inboundSink::clear;
            discreteActionArray[5] = this.outboundSink::clear;
            Throwables.maybeFail(discreteActionArray);
        }
    }

    private void shutdownExecutors(long deadlineNanos) throws TimeoutException, InterruptedException {
        this.socketFactory.shutdownNow();
        this.socketFactory.awaitTerminationUntil(deadlineNanos);
    }

    private OutboundConnections getOutbound(InetAddressAndPort to) {
        OutboundConnections connections = (OutboundConnections)this.channelManagers.get(to);
        if (connections == null) {
            connections = OutboundConnections.tryRegister(this.channelManagers, to, new OutboundConnectionSettings(to).withDefaults(ConnectionCategory.MESSAGING));
        }
        return connections;
    }

    InboundMessageHandlers getInbound(InetAddressAndPort from) {
        InboundMessageHandlers handlers = (InboundMessageHandlers)this.messageHandlers.get(from);
        if (null != handlers) {
            return handlers;
        }
        return this.messageHandlers.computeIfAbsent(from, addr -> new InboundMessageHandlers(FBUtilities.getLocalAddressAndPort(), (InetAddressAndPort)addr, DatabaseDescriptor.getInternodeApplicationReceiveQueueCapacityInBytes(), DatabaseDescriptor.getInternodeApplicationReceiveQueueReserveEndpointCapacityInBytes(), this.inboundGlobalReserveLimits, this.metrics, this.inboundSink));
    }

    @VisibleForTesting
    boolean isConnected(InetAddressAndPort address, Message<?> messageOut) {
        OutboundConnections pool = (OutboundConnections)this.channelManagers.get(address);
        if (pool == null) {
            return false;
        }
        return pool.connectionFor(messageOut).isConnected();
    }

    public void listen() {
        this.inboundSockets.open();
    }

    public void waitUntilListening() throws InterruptedException {
        this.inboundSockets.open().await();
    }

    private static class MSHandle {
        public static final MessagingService instance = new MessagingService(false);

        private MSHandle() {
        }
    }

    public static enum Version {
        VERSION_30(10),
        VERSION_3014(11),
        VERSION_40(12);

        public final int value;

        private Version(int value) {
            this.value = value;
        }
    }
}

