/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.remote.client.socket;

import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.channels.SocketChannel;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.PeerSelector;
import org.apache.nifi.remote.client.PeerStatusProvider;
import org.apache.nifi.remote.client.SiteInfoProvider;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.client.socket.EndpointConnection;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.apache.nifi.remote.util.EventReportUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EndpointConnectionPool
implements PeerStatusProvider {
    private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class);
    private final ConcurrentMap<PeerDescription, BlockingQueue<EndpointConnection>> connectionQueueMap = new ConcurrentHashMap<PeerDescription, BlockingQueue<EndpointConnection>>();
    private final URI clusterUrl;
    private final Set<EndpointConnection> activeConnections = Collections.synchronizedSet(new HashSet());
    private final EventReporter eventReporter;
    private final SSLContext sslContext;
    private final ScheduledExecutorService taskExecutor;
    private final int idleExpirationMillis;
    private final RemoteDestination remoteDestination;
    private volatile int commsTimeout;
    private volatile boolean shutdown = false;
    private volatile Set<PeerStatus> lastFetchedQueryablePeers;
    private final SiteInfoProvider siteInfoProvider;
    private final PeerSelector peerSelector;

    public EndpointConnectionPool(URI clusterUrl, RemoteDestination remoteDestination, int commsTimeoutMillis, int idleExpirationMillis, SSLContext sslContext, EventReporter eventReporter, File persistenceFile, SiteInfoProvider siteInfoProvider) {
        Objects.requireNonNull(clusterUrl, "URL cannot be null");
        Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null");
        this.clusterUrl = clusterUrl;
        this.remoteDestination = remoteDestination;
        this.sslContext = sslContext;
        this.eventReporter = eventReporter;
        this.commsTimeout = commsTimeoutMillis;
        this.idleExpirationMillis = idleExpirationMillis;
        this.siteInfoProvider = siteInfoProvider;
        this.peerSelector = new PeerSelector(this, persistenceFile);
        this.peerSelector.setEventReporter(eventReporter);
        this.taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory(){
            private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = this.defaultFactory.newThread(r);
                thread.setName("NiFi Site-to-Site Connection Pool Maintenance");
                thread.setDaemon(true);
                return thread;
            }
        });
        this.taskExecutor.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                EndpointConnectionPool.this.peerSelector.refreshPeers();
            }
        }, 0L, 5L, TimeUnit.SECONDS);
        this.taskExecutor.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                EndpointConnectionPool.this.cleanupExpiredSockets();
            }
        }, 5L, 5L, TimeUnit.SECONDS);
    }

    private String getPortIdentifier(TransferDirection transferDirection) throws IOException {
        if (this.remoteDestination.getIdentifier() != null) {
            return this.remoteDestination.getIdentifier();
        }
        return this.siteInfoProvider.getPortIdentifier(this.remoteDestination.getName(), transferDirection);
    }

    public EndpointConnection getEndpointConnection(TransferDirection direction) throws IOException {
        return this.getEndpointConnection(direction, null);
    }

    public EndpointConnection getEndpointConnection(TransferDirection direction, SiteToSiteClientConfig config) throws IOException {
        EndpointConnection connection;
        FlowFileCodec codec = null;
        Closeable commsSession = null;
        SocketClientProtocol protocol = null;
        Peer peer = null;
        do {
            BlockingQueue existing;
            ArrayList<EndpointConnection> addBack = new ArrayList<EndpointConnection>();
            logger.debug("{} getting next peer status", (Object)this);
            PeerStatus peerStatus = this.peerSelector.getNextPeerStatus(direction);
            logger.debug("{} next peer status = {}", (Object)this, (Object)peerStatus);
            if (peerStatus == null) {
                return null;
            }
            PeerDescription peerDescription = peerStatus.getPeerDescription();
            BlockingQueue<EndpointConnection> connectionQueue = (LinkedBlockingQueue<EndpointConnection>)this.connectionQueueMap.get(peerDescription);
            if (connectionQueue == null && (existing = (BlockingQueue)this.connectionQueueMap.putIfAbsent(peerDescription, connectionQueue = new LinkedBlockingQueue<EndpointConnection>())) != null) {
                connectionQueue = existing;
            }
            try {
                connection = (EndpointConnection)connectionQueue.poll();
                logger.debug("{} Connection State for {} = {}", new Object[]{this, this.clusterUrl, connection});
                String portId = this.getPortIdentifier(direction);
                if (connection == null && !addBack.isEmpty()) {
                    logger.debug("{} all Connections for {} are penalized; returning no Connection", (Object)this, (Object)portId);
                    EndpointConnection endpointConnection = null;
                    return endpointConnection;
                }
                if (connection != null && connection.getPeer().isPenalized(portId)) {
                    addBack.add(connection);
                    continue;
                }
                if (connection == null) {
                    long penalizationMillis;
                    block35: {
                        logger.debug("{} No Connection available for Port {}; creating new Connection", (Object)this, (Object)portId);
                        protocol = new SocketClientProtocol();
                        protocol.setDestination(new IdEnrichedRemoteDestination(this.remoteDestination, portId));
                        protocol.setEventReporter(this.eventReporter);
                        penalizationMillis = this.remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS);
                        try {
                            logger.debug("{} Establishing site-to-site connection with {}", (Object)this, (Object)peerStatus);
                            commsSession = this.establishSiteToSiteConnection(peerStatus);
                        }
                        catch (IOException ioe) {
                            this.peerSelector.penalize(peerStatus.getPeerDescription(), penalizationMillis);
                            throw ioe;
                        }
                        DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
                        DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
                        try {
                            logger.debug("{} Negotiating protocol", (Object)this);
                            RemoteResourceInitiator.initiateResourceNegotiation(protocol, dis, dos);
                        }
                        catch (HandshakeException e) {
                            try {
                                commsSession.close();
                            }
                            catch (IOException ioe) {
                                throw e;
                            }
                        }
                        String peerUrl = "nifi://" + peerDescription.getHostname() + ":" + peerDescription.getPort();
                        peer = new Peer(peerDescription, (CommunicationsSession)commsSession, peerUrl, this.clusterUrl.toString());
                        if (config != null) {
                            protocol.setTimeout((int)config.getTimeout(TimeUnit.MILLISECONDS));
                            protocol.setPreferredBatchCount(config.getPreferredBatchCount());
                            protocol.setPreferredBatchSize(config.getPreferredBatchSize());
                            protocol.setPreferredBatchDuration(config.getPreferredBatchDuration(TimeUnit.MILLISECONDS));
                        }
                        logger.debug("{} performing handshake", (Object)this);
                        protocol.handshake(peer);
                        if (!protocol.isDestinationFull()) break block35;
                        logger.warn("{} {} indicates that port {}'s destination is full; penalizing peer", new Object[]{this, peer, config.getPortName() == null ? config.getPortIdentifier() : config.getPortName()});
                        this.peerSelector.penalize(peer, penalizationMillis);
                        try {
                            peer.close();
                        }
                        catch (IOException ioe) {
                            // empty catch block
                        }
                        continue;
                    }
                    try {
                        if (protocol.isPortInvalid()) {
                            this.peerSelector.penalize(peer, penalizationMillis);
                            this.cleanup(protocol, peer);
                            throw new PortNotRunningException(peer.toString() + " indicates that port " + portId + " is not running");
                        }
                        if (protocol.isPortUnknown()) {
                            this.peerSelector.penalize(peer, penalizationMillis);
                            this.cleanup(protocol, peer);
                            throw new UnknownPortException(peer.toString() + " indicates that port " + portId + " is not known");
                        }
                        logger.debug("{} negotiating codec", (Object)this);
                        codec = protocol.negotiateCodec(peer);
                        logger.debug("{} negotiated codec is {}", (Object)this, (Object)codec);
                    }
                    catch (PortNotRunningException | UnknownPortException e) {
                        throw e;
                    }
                    catch (Exception e) {
                        this.peerSelector.penalize(peer, penalizationMillis);
                        this.cleanup(protocol, peer);
                        String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? this.clusterUrl : peer, e.toString());
                        EventReportUtil.error(logger, this.eventReporter, message, new Object[0]);
                        if (logger.isDebugEnabled()) {
                            logger.error("", (Throwable)e);
                        }
                        throw e;
                    }
                    connection = new EndpointConnection(peer, protocol, codec);
                    continue;
                }
                long lastTimeUsed = connection.getLastTimeUsed();
                long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed;
                if ((long)this.commsTimeout > 0L && millisSinceLastUse >= (long)this.commsTimeout) {
                    this.cleanup(connection.getSocketClientProtocol(), connection.getPeer());
                    connection = null;
                    continue;
                }
                codec = connection.getCodec();
                peer = connection.getPeer();
                commsSession = peer.getCommunicationsSession();
                protocol = connection.getSocketClientProtocol();
            }
            catch (Throwable t) {
                if (commsSession != null) {
                    try {
                        commsSession.close();
                    }
                    catch (IOException iOException) {
                        // empty catch block
                    }
                }
                throw t;
            }
            finally {
                if (!addBack.isEmpty()) {
                    connectionQueue.addAll(addBack);
                    addBack.clear();
                }
            }
        } while (connection == null || codec == null || commsSession == null || protocol == null);
        this.activeConnections.add(connection);
        return connection;
    }

    public boolean offer(EndpointConnection endpointConnection) {
        Peer peer = endpointConnection.getPeer();
        if (peer == null) {
            return false;
        }
        BlockingQueue connectionQueue = (BlockingQueue)this.connectionQueueMap.get(peer.getDescription());
        if (connectionQueue == null) {
            return false;
        }
        this.activeConnections.remove(endpointConnection);
        if (this.shutdown) {
            this.terminate(endpointConnection);
            return false;
        }
        endpointConnection.setLastTimeUsed();
        return connectionQueue.offer(endpointConnection);
    }

    private void cleanup(SocketClientProtocol protocol, Peer peer) {
        if (protocol != null && peer != null) {
            try {
                protocol.shutdown(peer);
            }
            catch (TransmissionDisabledException e) {
                logger.debug(this + " Transmission Disabled by User");
            }
            catch (IOException e) {
                // empty catch block
            }
        }
        if (peer != null) {
            try {
                peer.close();
            }
            catch (TransmissionDisabledException e) {
                logger.debug(this + " Transmission Disabled by User");
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    private Set<PeerStatus> fetchRemotePeerStatuses(PeerDescription peerDescription) throws IOException {
        Set<PeerStatus> peerStatuses;
        block9: {
            String message;
            Peer peer;
            block8: {
                String hostname = peerDescription.getHostname();
                int port = peerDescription.getPort();
                PeerDescription clusterPeerDescription = new PeerDescription(hostname, port, this.clusterUrl.toString().startsWith("https://"));
                CommunicationsSession commsSession = this.establishSiteToSiteConnection(hostname, port);
                peer = new Peer(clusterPeerDescription, commsSession, "nifi://" + hostname + ":" + port, this.clusterUrl.toString());
                SocketClientProtocol clientProtocol = new SocketClientProtocol();
                DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
                DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
                RemoteResourceInitiator.initiateResourceNegotiation(clientProtocol, dis, dos);
                clientProtocol.setTimeout(this.commsTimeout);
                if (clientProtocol.getVersionNegotiator().getVersion() < 5) {
                    String portId = this.getPortIdentifier(TransferDirection.RECEIVE);
                    if (portId == null) {
                        portId = this.getPortIdentifier(TransferDirection.SEND);
                    }
                    if (portId == null) {
                        peer.close();
                        throw new IOException("Failed to determine the identifier of port " + this.remoteDestination.getName());
                    }
                    clientProtocol.handshake(peer, portId);
                } else {
                    clientProtocol.handshake(peer, null);
                }
                peerStatuses = clientProtocol.getPeerStatuses(peer);
                try {
                    clientProtocol.shutdown(peer);
                }
                catch (IOException e) {
                    message = String.format("%s Failed to shutdown protocol when updating list of peers due to %s", this, e.toString());
                    EventReportUtil.warn(logger, this.eventReporter, message, new Object[0]);
                    if (!logger.isDebugEnabled()) break block8;
                    logger.warn("", (Throwable)e);
                }
            }
            try {
                peer.close();
            }
            catch (IOException e) {
                message = String.format("%s Failed to close resources when updating list of peers due to %s", this, e.toString());
                EventReportUtil.warn(logger, this.eventReporter, message, new Object[0]);
                if (!logger.isDebugEnabled()) break block9;
                logger.warn("", (Throwable)e);
            }
        }
        return peerStatuses;
    }

    @Override
    public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
        HashSet<PeerDescription> peersToRequestClusterInfoFrom = new HashSet<PeerDescription>();
        Set<PeerStatus> lastFetched = this.lastFetchedQueryablePeers;
        if (lastFetched != null && !lastFetched.isEmpty()) {
            lastFetched.stream().map(peer -> peer.getPeerDescription()).forEach(desc -> peersToRequestClusterInfoFrom.add((PeerDescription)desc));
        }
        String hostname = this.clusterUrl.getHost();
        Integer port = this.siteInfoProvider.getSiteToSitePort();
        if (port == null) {
            throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications");
        }
        boolean secure = this.siteInfoProvider.isSecure();
        peersToRequestClusterInfoFrom.add(new PeerDescription(hostname, port, secure));
        Exception lastFailure = null;
        for (PeerDescription peerDescription : peersToRequestClusterInfoFrom) {
            try {
                Set<PeerStatus> statuses = this.fetchRemotePeerStatuses(peerDescription);
                this.lastFetchedQueryablePeers = statuses.stream().filter(p -> p.isQueryForPeers()).collect(Collectors.toSet());
                return statuses;
            }
            catch (Exception e) {
                logger.warn("Could not communicate with {}:{} to determine which nodes exist in the remote NiFi cluster", (Object)peerDescription.getHostname(), (Object)peerDescription.getPort());
                lastFailure = e;
            }
        }
        IOException ioe = new IOException("Unable to communicate with remote NiFi cluster in order to determine which nodes exist in the remote cluster");
        if (lastFailure != null) {
            ioe.addSuppressed(lastFailure);
        }
        throw ioe;
    }

    private CommunicationsSession establishSiteToSiteConnection(PeerStatus peerStatus) throws IOException {
        PeerDescription description = peerStatus.getPeerDescription();
        return this.establishSiteToSiteConnection(description.getHostname(), description.getPort());
    }

    private CommunicationsSession establishSiteToSiteConnection(String hostname, int port) throws IOException {
        boolean siteToSiteSecure = this.siteInfoProvider.isSecure();
        Closeable commsSession = null;
        try {
            if (siteToSiteSecure) {
                if (this.sslContext == null) {
                    throw new IOException("Unable to communicate with " + hostname + ":" + port + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
                }
                SSLSocketChannel socketChannel = new SSLSocketChannel(this.sslContext, hostname, port, true);
                socketChannel.connect();
                commsSession = new SSLSocketChannelCommunicationsSession(socketChannel);
                try {
                    commsSession.setUserDn(socketChannel.getDn());
                }
                catch (CertificateException ex) {
                    throw new IOException(ex);
                }
            } else {
                SocketChannel socketChannel = SocketChannel.open();
                socketChannel.socket().connect(new InetSocketAddress(hostname, port), this.commsTimeout);
                socketChannel.socket().setSoTimeout(this.commsTimeout);
                commsSession = new SocketChannelCommunicationsSession(socketChannel);
            }
            commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
        }
        catch (IOException ioe) {
            if (commsSession != null) {
                commsSession.close();
            }
            throw ioe;
        }
        return commsSession;
    }

    private void cleanupExpiredSockets() {
        for (BlockingQueue connectionQueue : this.connectionQueueMap.values()) {
            EndpointConnection connection;
            ArrayList<EndpointConnection> connections = new ArrayList<EndpointConnection>();
            while ((connection = (EndpointConnection)connectionQueue.poll()) != null) {
                long lastUsed = connection.getLastTimeUsed();
                if (lastUsed < System.currentTimeMillis() - (long)this.idleExpirationMillis) {
                    try {
                        connection.getSocketClientProtocol().shutdown(connection.getPeer());
                    }
                    catch (Exception e) {
                        logger.debug("Failed to shut down {} using {} due to {}", new Object[]{connection.getSocketClientProtocol(), connection.getPeer(), e});
                    }
                    this.terminate(connection);
                    continue;
                }
                connections.add(connection);
            }
            connectionQueue.addAll(connections);
        }
    }

    public void shutdown() {
        this.shutdown = true;
        this.taskExecutor.shutdown();
        this.peerSelector.clear();
        for (EndpointConnection conn : this.activeConnections) {
            conn.getPeer().getCommunicationsSession().interrupt();
        }
        for (BlockingQueue connectionQueue : this.connectionQueueMap.values()) {
            EndpointConnection state;
            while ((state = (EndpointConnection)connectionQueue.poll()) != null) {
                this.terminate(state);
            }
        }
    }

    public void terminate(EndpointConnection connection) {
        this.activeConnections.remove(connection);
        this.cleanup(connection.getSocketClientProtocol(), connection.getPeer());
    }

    public String toString() {
        return "EndpointConnectionPool[Cluster URL=" + this.clusterUrl + "]";
    }

    private class IdEnrichedRemoteDestination
    implements RemoteDestination {
        private final RemoteDestination original;
        private final String identifier;

        public IdEnrichedRemoteDestination(RemoteDestination original, String identifier) {
            this.original = original;
            this.identifier = identifier;
        }

        public String getIdentifier() {
            return this.identifier;
        }

        public String getName() {
            return this.original.getName();
        }

        public long getYieldPeriod(TimeUnit timeUnit) {
            return this.original.getYieldPeriod(timeUnit);
        }

        public boolean isUseCompression() {
            return this.original.isUseCompression();
        }
    }
}

