/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.remote.client.socket;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.client.socket.EndpointConnection;
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
import org.apache.nifi.remote.cluster.NodeInformation;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.apache.nifi.remote.util.NiFiRestApiUtil;
import org.apache.nifi.remote.util.PeerStatusCache;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.MessageFormatter;

public class EndpointConnectionPool {
    public static final long PEER_REFRESH_PERIOD = 60000L;
    public static final String CATEGORY = "Site-to-Site";
    public static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10L, TimeUnit.MINUTES);
    private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1L, TimeUnit.MINUTES);
    private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class);
    private final ConcurrentMap<PeerDescription, BlockingQueue<EndpointConnection>> connectionQueueMap = new ConcurrentHashMap<PeerDescription, BlockingQueue<EndpointConnection>>();
    private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations = new ConcurrentHashMap<PeerDescription, Long>();
    private final URI clusterUrl;
    private final String apiUri;
    private final AtomicLong peerIndex = new AtomicLong(0L);
    private final ReentrantLock peerRefreshLock = new ReentrantLock();
    private volatile List<PeerStatus> peerStatuses;
    private volatile long peerRefreshTime = 0L;
    private volatile PeerStatusCache peerStatusCache;
    private final Set<EndpointConnection> activeConnections = Collections.synchronizedSet(new HashSet());
    private final File peersFile;
    private final EventReporter eventReporter;
    private final SSLContext sslContext;
    private final ScheduledExecutorService taskExecutor;
    private final int idleExpirationMillis;
    private final RemoteDestination remoteDestination;
    private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock();
    private final Lock remoteInfoReadLock = this.listeningPortRWLock.readLock();
    private final Lock remoteInfoWriteLock = this.listeningPortRWLock.writeLock();
    private Integer siteToSitePort;
    private Boolean siteToSiteSecure;
    private long remoteRefreshTime;
    private final Map<String, String> inputPortMap = new HashMap<String, String>();
    private final Map<String, String> outputPortMap = new HashMap<String, String>();
    private volatile int commsTimeout;
    private volatile boolean shutdown = false;

    public EndpointConnectionPool(String clusterUrl, RemoteDestination remoteDestination, int commsTimeoutMillis, int idleExpirationMillis, EventReporter eventReporter, File persistenceFile) {
        this(clusterUrl, remoteDestination, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile);
    }

    public EndpointConnectionPool(String clusterUrl, RemoteDestination remoteDestination, int commsTimeoutMillis, int idleExpirationMillis, SSLContext sslContext, EventReporter eventReporter, File persistenceFile) {
        Objects.requireNonNull(clusterUrl, "URL cannot be null");
        Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null");
        try {
            this.clusterUrl = new URI(clusterUrl);
        }
        catch (URISyntaxException e) {
            throw new IllegalArgumentException("Invalid Cluster URL: " + clusterUrl);
        }
        String uriPath = this.clusterUrl.getPath();
        if (uriPath.endsWith("/")) {
            uriPath = uriPath.substring(0, uriPath.length() - 1);
        }
        this.apiUri = this.clusterUrl.getScheme() + "://" + this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api";
        this.remoteDestination = remoteDestination;
        this.sslContext = sslContext;
        this.peersFile = persistenceFile;
        this.eventReporter = eventReporter;
        this.commsTimeout = commsTimeoutMillis;
        this.idleExpirationMillis = idleExpirationMillis;
        if (persistenceFile != null && persistenceFile.exists()) {
            try {
                Set<PeerStatus> recoveredStatuses = this.recoverPersistedPeerStatuses(this.peersFile);
                this.peerStatusCache = new PeerStatusCache(recoveredStatuses, this.peersFile.lastModified());
            }
            catch (IOException ioe) {
                logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", (Object)persistenceFile, (Object)ioe);
            }
        } else {
            this.peerStatusCache = null;
        }
        this.taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory(){
            private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = this.defaultFactory.newThread(r);
                thread.setName("NiFi Site-to-Site Connection Pool Maintenance");
                thread.setDaemon(true);
                return thread;
            }
        });
        this.taskExecutor.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                EndpointConnectionPool.this.refreshPeers();
            }
        }, 0L, 5L, TimeUnit.SECONDS);
        this.taskExecutor.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                EndpointConnectionPool.this.cleanupExpiredSockets();
            }
        }, 5L, 5L, TimeUnit.SECONDS);
    }

    void warn(String msg, Object ... args) {
        logger.warn(msg, args);
        if (this.eventReporter != null) {
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, MessageFormatter.arrayFormat((String)msg, (Object[])args).getMessage());
        }
    }

    void warn(String msg, Throwable t) {
        logger.warn(msg, t);
        if (this.eventReporter != null) {
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, msg + ": " + t.toString());
        }
    }

    void error(String msg, Object ... args) {
        logger.error(msg, args);
        if (this.eventReporter != null) {
            this.eventReporter.reportEvent(Severity.ERROR, CATEGORY, MessageFormatter.arrayFormat((String)msg, (Object[])args).getMessage());
        }
    }

    private String getPortIdentifier(TransferDirection transferDirection) throws IOException {
        if (this.remoteDestination.getIdentifier() != null) {
            return this.remoteDestination.getIdentifier();
        }
        if (transferDirection == TransferDirection.RECEIVE) {
            return this.getOutputPortIdentifier(this.remoteDestination.getName());
        }
        return this.getInputPortIdentifier(this.remoteDestination.getName());
    }

    public EndpointConnection getEndpointConnection(TransferDirection direction) throws IOException {
        return this.getEndpointConnection(direction, null);
    }

    public EndpointConnection getEndpointConnection(TransferDirection direction, SiteToSiteClientConfig config) throws IOException {
        EndpointConnection connection;
        FlowFileCodec codec = null;
        Closeable commsSession = null;
        SocketClientProtocol protocol = null;
        Peer peer = null;
        do {
            BlockingQueue existing;
            ArrayList<EndpointConnection> addBack = new ArrayList<EndpointConnection>();
            logger.debug("{} getting next peer status", (Object)this);
            PeerStatus peerStatus = this.getNextPeerStatus(direction);
            logger.debug("{} next peer status = {}", (Object)this, (Object)peerStatus);
            if (peerStatus == null) {
                return null;
            }
            PeerDescription peerDescription = peerStatus.getPeerDescription();
            BlockingQueue<EndpointConnection> connectionQueue = (LinkedBlockingQueue<EndpointConnection>)this.connectionQueueMap.get(peerDescription);
            if (connectionQueue == null && (existing = (BlockingQueue)this.connectionQueueMap.putIfAbsent(peerDescription, connectionQueue = new LinkedBlockingQueue<EndpointConnection>())) != null) {
                connectionQueue = existing;
            }
            try {
                connection = (EndpointConnection)connectionQueue.poll();
                logger.debug("{} Connection State for {} = {}", new Object[]{this, this.clusterUrl, connection});
                String portId = this.getPortIdentifier(direction);
                if (connection == null && !addBack.isEmpty()) {
                    logger.debug("{} all Connections for {} are penalized; returning no Connection", (Object)this, (Object)portId);
                    EndpointConnection endpointConnection = null;
                    return endpointConnection;
                }
                if (connection != null && connection.getPeer().isPenalized(portId)) {
                    addBack.add(connection);
                    continue;
                }
                if (connection == null) {
                    long penalizationMillis;
                    block35: {
                        logger.debug("{} No Connection available for Port {}; creating new Connection", (Object)this, (Object)portId);
                        protocol = new SocketClientProtocol();
                        protocol.setDestination(new IdEnrichedRemoteDestination(this.remoteDestination, portId));
                        protocol.setEventReporter(this.eventReporter);
                        penalizationMillis = this.remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS);
                        try {
                            logger.debug("{} Establishing site-to-site connection with {}", (Object)this, (Object)peerStatus);
                            commsSession = this.establishSiteToSiteConnection(peerStatus);
                        }
                        catch (IOException ioe) {
                            this.penalize(peerStatus.getPeerDescription(), penalizationMillis);
                            throw ioe;
                        }
                        DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
                        DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
                        try {
                            logger.debug("{} Negotiating protocol", (Object)this);
                            RemoteResourceInitiator.initiateResourceNegotiation(protocol, dis, dos);
                        }
                        catch (HandshakeException e) {
                            try {
                                commsSession.close();
                            }
                            catch (IOException ioe) {
                                throw e;
                            }
                        }
                        String peerUrl = "nifi://" + peerDescription.getHostname() + ":" + peerDescription.getPort();
                        peer = new Peer(peerDescription, (CommunicationsSession)commsSession, peerUrl, this.clusterUrl.toString());
                        if (config != null) {
                            protocol.setTimeout((int)config.getTimeout(TimeUnit.MILLISECONDS));
                            protocol.setPreferredBatchCount(config.getPreferredBatchCount());
                            protocol.setPreferredBatchSize(config.getPreferredBatchSize());
                            protocol.setPreferredBatchDuration(config.getPreferredBatchDuration(TimeUnit.MILLISECONDS));
                        }
                        logger.debug("{} performing handshake", (Object)this);
                        protocol.handshake(peer);
                        if (!protocol.isDestinationFull()) break block35;
                        logger.warn("{} {} indicates that port {}'s destination is full; penalizing peer", new Object[]{this, peer, config.getPortName() == null ? config.getPortIdentifier() : config.getPortName()});
                        this.penalize(peer, penalizationMillis);
                        try {
                            peer.close();
                        }
                        catch (IOException ioe) {
                            // empty catch block
                        }
                        continue;
                    }
                    try {
                        if (protocol.isPortInvalid()) {
                            this.penalize(peer, penalizationMillis);
                            this.cleanup(protocol, peer);
                            throw new PortNotRunningException(peer.toString() + " indicates that port " + portId + " is not running");
                        }
                        if (protocol.isPortUnknown()) {
                            this.penalize(peer, penalizationMillis);
                            this.cleanup(protocol, peer);
                            throw new UnknownPortException(peer.toString() + " indicates that port " + portId + " is not known");
                        }
                        logger.debug("{} negotiating codec", (Object)this);
                        codec = protocol.negotiateCodec(peer);
                        logger.debug("{} negotiated codec is {}", (Object)this, (Object)codec);
                    }
                    catch (PortNotRunningException | UnknownPortException e) {
                        throw e;
                    }
                    catch (Exception e) {
                        this.penalize(peer, penalizationMillis);
                        this.cleanup(protocol, peer);
                        String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? this.clusterUrl : peer, e.toString());
                        this.error(message, new Object[0]);
                        if (logger.isDebugEnabled()) {
                            logger.error("", (Throwable)e);
                        }
                        throw e;
                    }
                    connection = new EndpointConnection(peer, protocol, codec);
                    continue;
                }
                long lastTimeUsed = connection.getLastTimeUsed();
                long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed;
                if ((long)this.commsTimeout > 0L && millisSinceLastUse >= (long)this.commsTimeout) {
                    this.cleanup(connection.getSocketClientProtocol(), connection.getPeer());
                    connection = null;
                    continue;
                }
                codec = connection.getCodec();
                peer = connection.getPeer();
                commsSession = peer.getCommunicationsSession();
                protocol = connection.getSocketClientProtocol();
            }
            catch (Throwable t) {
                if (commsSession != null) {
                    try {
                        commsSession.close();
                    }
                    catch (IOException iOException) {
                        // empty catch block
                    }
                }
                throw t;
            }
            finally {
                if (!addBack.isEmpty()) {
                    connectionQueue.addAll(addBack);
                    addBack.clear();
                }
            }
        } while (connection == null || codec == null || commsSession == null || protocol == null);
        this.activeConnections.add(connection);
        return connection;
    }

    public boolean offer(EndpointConnection endpointConnection) {
        Peer peer = endpointConnection.getPeer();
        if (peer == null) {
            return false;
        }
        BlockingQueue connectionQueue = (BlockingQueue)this.connectionQueueMap.get(peer.getDescription());
        if (connectionQueue == null) {
            return false;
        }
        this.activeConnections.remove(endpointConnection);
        if (this.shutdown) {
            this.terminate(endpointConnection);
            return false;
        }
        endpointConnection.setLastTimeUsed();
        return connectionQueue.offer(endpointConnection);
    }

    private void penalize(PeerDescription peerDescription, long penalizationMillis) {
        Long expiration = (Long)this.peerTimeoutExpirations.get(peerDescription);
        if (expiration == null) {
            expiration = 0L;
        }
        long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
        this.peerTimeoutExpirations.put(peerDescription, newExpiration);
    }

    public void penalize(Peer peer, long penalizationMillis) {
        this.penalize(peer.getDescription(), penalizationMillis);
    }

    private void cleanup(SocketClientProtocol protocol, Peer peer) {
        if (protocol != null && peer != null) {
            try {
                protocol.shutdown(peer);
            }
            catch (TransmissionDisabledException e) {
                logger.debug(this + " Transmission Disabled by User");
            }
            catch (IOException e) {
                // empty catch block
            }
        }
        if (peer != null) {
            try {
                peer.close();
            }
            catch (TransmissionDisabledException e) {
                logger.debug(this + " Transmission Disabled by User");
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    private boolean isPeerRefreshNeeded(List<PeerStatus> peerList) {
        return peerList == null || peerList.isEmpty() || System.currentTimeMillis() > this.peerRefreshTime + 60000L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private PeerStatus getNextPeerStatus(TransferDirection direction) {
        List<PeerStatus> peerList;
        block10: {
            peerList = this.peerStatuses;
            if (this.isPeerRefreshNeeded(peerList)) {
                this.peerRefreshLock.lock();
                try {
                    block11: {
                        peerList = this.peerStatuses;
                        if (!this.isPeerRefreshNeeded(peerList)) break block10;
                        try {
                            peerList = this.createPeerStatusList(direction);
                        }
                        catch (Exception e) {
                            String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
                            this.warn(message, new Object[0]);
                            if (logger.isDebugEnabled()) {
                                logger.warn("", (Throwable)e);
                            }
                            if (this.eventReporter == null) break block11;
                            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
                        }
                    }
                    this.peerStatuses = peerList;
                    this.peerRefreshTime = System.currentTimeMillis();
                }
                finally {
                    this.peerRefreshLock.unlock();
                }
            }
        }
        if (peerList == null || peerList.isEmpty()) {
            return null;
        }
        for (int i = 0; i < peerList.size(); ++i) {
            long idx = this.peerIndex.getAndIncrement();
            int listIndex = (int)(idx % (long)peerList.size());
            PeerStatus peerStatus = peerList.get(listIndex);
            if (!this.isPenalized(peerStatus)) {
                return peerStatus;
            }
            logger.debug("{} {} is penalized; will not communicate with this peer", (Object)this, (Object)peerStatus);
        }
        logger.debug("{} All peers appear to be penalized; returning null", (Object)this);
        return null;
    }

    private boolean isPenalized(PeerStatus peerStatus) {
        Long expirationEnd = (Long)this.peerTimeoutExpirations.get(peerStatus.getPeerDescription());
        return expirationEnd != null && expirationEnd > System.currentTimeMillis();
    }

    private List<PeerStatus> createPeerStatusList(TransferDirection direction) throws IOException {
        Set<PeerStatus> statuses = this.getPeerStatuses();
        if (statuses == null) {
            this.refreshPeers();
            statuses = this.getPeerStatuses();
            if (statuses == null) {
                logger.debug("{} found no peers to connect to", (Object)this);
                return Collections.emptyList();
            }
        }
        ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
        ArrayList<NodeInformation> nodeInfos = new ArrayList<NodeInformation>();
        for (PeerStatus peerStatus : statuses) {
            PeerDescription description = peerStatus.getPeerDescription();
            NodeInformation nodeInfo = new NodeInformation(description.getHostname(), description.getPort(), 0, description.isSecure(), peerStatus.getFlowFileCount());
            nodeInfos.add(nodeInfo);
        }
        clusterNodeInfo.setNodeInformation(nodeInfos);
        return EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, direction);
    }

    private Set<PeerStatus> getPeerStatuses() {
        PeerStatusCache cache = this.peerStatusCache;
        if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) {
            return null;
        }
        if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) {
            HashSet<PeerStatus> equalizedSet = new HashSet<PeerStatus>(cache.getStatuses().size());
            for (PeerStatus status : cache.getStatuses()) {
                PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1);
                equalizedSet.add(equalizedStatus);
            }
            return equalizedSet;
        }
        return cache.getStatuses();
    }

    private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
        Set<PeerStatus> peerStatuses;
        block10: {
            String message;
            Peer peer;
            block9: {
                String hostname = this.clusterUrl.getHost();
                Integer port = this.getSiteToSitePort();
                if (port == null) {
                    throw new IOException("Remote instance of NiFi is not configured to allow site-to-site communications");
                }
                PeerDescription clusterPeerDescription = new PeerDescription(hostname, port, this.clusterUrl.toString().startsWith("https://"));
                CommunicationsSession commsSession = this.establishSiteToSiteConnection(hostname, port);
                peer = new Peer(clusterPeerDescription, commsSession, "nifi://" + hostname + ":" + port, this.clusterUrl.toString());
                SocketClientProtocol clientProtocol = new SocketClientProtocol();
                DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
                DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
                RemoteResourceInitiator.initiateResourceNegotiation(clientProtocol, dis, dos);
                clientProtocol.setTimeout(this.commsTimeout);
                if (clientProtocol.getVersionNegotiator().getVersion() < 5) {
                    String portId = this.getPortIdentifier(TransferDirection.RECEIVE);
                    if (portId == null) {
                        portId = this.getPortIdentifier(TransferDirection.SEND);
                    }
                    if (portId == null) {
                        peer.close();
                        throw new IOException("Failed to determine the identifier of port " + this.remoteDestination.getName());
                    }
                    clientProtocol.handshake(peer, portId);
                } else {
                    clientProtocol.handshake(peer, null);
                }
                peerStatuses = clientProtocol.getPeerStatuses(peer);
                this.persistPeerStatuses(peerStatuses);
                try {
                    clientProtocol.shutdown(peer);
                }
                catch (IOException e) {
                    message = String.format("%s Failed to shutdown protocol when updating list of peers due to %s", this, e.toString());
                    this.warn(message, new Object[0]);
                    if (!logger.isDebugEnabled()) break block9;
                    logger.warn("", (Throwable)e);
                }
            }
            try {
                peer.close();
            }
            catch (IOException e) {
                message = String.format("%s Failed to close resources when updating list of peers due to %s", this, e.toString());
                this.warn(message, new Object[0]);
                if (!logger.isDebugEnabled()) break block10;
                logger.warn("", (Throwable)e);
            }
        }
        return peerStatuses;
    }

    private void persistPeerStatuses(Set<PeerStatus> statuses) {
        if (this.peersFile == null) {
            return;
        }
        try (FileOutputStream fos = new FileOutputStream(this.peersFile);
             BufferedOutputStream out = new BufferedOutputStream((OutputStream)fos);){
            for (PeerStatus status : statuses) {
                PeerDescription description = status.getPeerDescription();
                String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + "\n";
                out.write(line.getBytes(StandardCharsets.UTF_8));
            }
        }
        catch (IOException e) {
            this.error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString());
            logger.error("", (Throwable)e);
        }
    }

    private Set<PeerStatus> recoverPersistedPeerStatuses(File file) throws IOException {
        if (!file.exists()) {
            return null;
        }
        HashSet<PeerStatus> statuses = new HashSet<PeerStatus>();
        try (FileInputStream fis = new FileInputStream(file);
             BufferedReader reader = new BufferedReader(new InputStreamReader(fis));){
            String line;
            while ((line = reader.readLine()) != null) {
                String[] splits = line.split(Pattern.quote(":"));
                if (splits.length != 3) continue;
                String hostname = splits[0];
                int port = Integer.parseInt(splits[1]);
                boolean secure = Boolean.parseBoolean(splits[2]);
                statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1));
            }
        }
        return statuses;
    }

    private CommunicationsSession establishSiteToSiteConnection(PeerStatus peerStatus) throws IOException {
        PeerDescription description = peerStatus.getPeerDescription();
        return this.establishSiteToSiteConnection(description.getHostname(), description.getPort());
    }

    private CommunicationsSession establishSiteToSiteConnection(String hostname, int port) throws IOException {
        boolean siteToSiteSecure = this.isSecure();
        String destinationUri = "nifi://" + hostname + ":" + port;
        Closeable commsSession = null;
        try {
            if (siteToSiteSecure) {
                if (this.sslContext == null) {
                    throw new IOException("Unable to communicate with " + hostname + ":" + port + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
                }
                SSLSocketChannel socketChannel = new SSLSocketChannel(this.sslContext, hostname, port, true);
                socketChannel.connect();
                commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
                try {
                    commsSession.setUserDn(socketChannel.getDn());
                }
                catch (CertificateException ex) {
                    throw new IOException(ex);
                }
            } else {
                SocketChannel socketChannel = SocketChannel.open();
                socketChannel.socket().connect(new InetSocketAddress(hostname, port), this.commsTimeout);
                socketChannel.socket().setSoTimeout(this.commsTimeout);
                commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri);
            }
            commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
            commsSession.setUri(destinationUri);
        }
        catch (IOException ioe) {
            if (commsSession != null) {
                commsSession.close();
            }
            throw ioe;
        }
        return commsSession;
    }

    static List<PeerStatus> formulateDestinationList(ClusterNodeInformation clusterNodeInfo, TransferDirection direction) {
        Collection<NodeInformation> nodeInfoSet = clusterNodeInfo.getNodeInformation();
        int numDestinations = Math.max(128, nodeInfoSet.size());
        HashMap<NodeInformation, Integer> entryCountMap = new HashMap<NodeInformation, Integer>();
        long totalFlowFileCount = 0L;
        for (NodeInformation nodeInformation : nodeInfoSet) {
            totalFlowFileCount += (long)nodeInformation.getTotalFlowFiles();
        }
        int totalEntries = 0;
        for (NodeInformation nodeInfo : nodeInfoSet) {
            int flowFileCount = nodeInfo.getTotalFlowFiles();
            double percentageOfFlowFiles = Math.min(0.8, (double)flowFileCount / (double)totalFlowFileCount);
            double relativeWeighting = direction == TransferDirection.SEND ? 1.0 - percentageOfFlowFiles : percentageOfFlowFiles;
            int entries = Math.max(1, (int)((double)numDestinations * relativeWeighting));
            entryCountMap.put(nodeInfo, Math.max(1, entries));
            totalEntries += entries;
        }
        ArrayList<PeerStatus> arrayList = new ArrayList<PeerStatus>(totalEntries);
        for (int i = 0; i < totalEntries; ++i) {
            arrayList.add(null);
        }
        for (Map.Entry entry : entryCountMap.entrySet()) {
            int numEntries;
            NodeInformation nodeInfo = (NodeInformation)entry.getKey();
            int skipIndex = numEntries = ((Integer)entry.getValue()).intValue();
            for (int i = 0; i < numEntries; ++i) {
                int index;
                PeerStatus status;
                int n = skipIndex * i;
                while (true) {
                    if ((status = (PeerStatus)arrayList.get(index = n % arrayList.size())) == null) break;
                    ++n;
                }
                PeerDescription description = new PeerDescription(nodeInfo.getSiteToSiteHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure());
                status = new PeerStatus(description, nodeInfo.getTotalFlowFiles());
                arrayList.set(index, status);
            }
        }
        StringBuilder distributionDescription = new StringBuilder();
        distributionDescription.append("New Weighted Distribution of Nodes:");
        for (Map.Entry entry : entryCountMap.entrySet()) {
            double percentage = (double)((Integer)entry.getValue()).intValue() * 100.0 / (double)arrayList.size();
            distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data");
        }
        logger.info(distributionDescription.toString());
        return arrayList;
    }

    private void cleanupExpiredSockets() {
        for (BlockingQueue connectionQueue : this.connectionQueueMap.values()) {
            EndpointConnection connection;
            ArrayList<EndpointConnection> connections = new ArrayList<EndpointConnection>();
            while ((connection = (EndpointConnection)connectionQueue.poll()) != null) {
                long lastUsed = connection.getLastTimeUsed();
                if (lastUsed < System.currentTimeMillis() - (long)this.idleExpirationMillis) {
                    try {
                        connection.getSocketClientProtocol().shutdown(connection.getPeer());
                    }
                    catch (Exception e) {
                        logger.debug("Failed to shut down {} using {} due to {}", new Object[]{connection.getSocketClientProtocol(), connection.getPeer(), e});
                    }
                    this.terminate(connection);
                    continue;
                }
                connections.add(connection);
            }
            connectionQueue.addAll(connections);
        }
    }

    public void shutdown() {
        this.shutdown = true;
        this.taskExecutor.shutdown();
        this.peerTimeoutExpirations.clear();
        for (EndpointConnection conn : this.activeConnections) {
            conn.getPeer().getCommunicationsSession().interrupt();
        }
        for (BlockingQueue connectionQueue : this.connectionQueueMap.values()) {
            EndpointConnection state;
            while ((state = (EndpointConnection)connectionQueue.poll()) != null) {
                this.terminate(state);
            }
        }
    }

    public void terminate(EndpointConnection connection) {
        this.activeConnections.remove(connection);
        this.cleanup(connection.getSocketClientProtocol(), connection.getPeer());
    }

    private void refreshPeers() {
        block3: {
            PeerStatusCache existingCache = this.peerStatusCache;
            if (existingCache != null && existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis()) {
                return;
            }
            try {
                Set<PeerStatus> statuses = this.fetchRemotePeerStatuses();
                this.peerStatusCache = new PeerStatusCache(statuses);
                logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", (Object)this, (Object)statuses.size());
            }
            catch (Exception e) {
                this.warn("{} Unable to refresh Remote Group's peers due to {}", this, e);
                if (!logger.isDebugEnabled()) break block3;
                logger.warn("", (Throwable)e);
            }
        }
    }

    public String getInputPortIdentifier(String portName) throws IOException {
        return this.getPortIdentifier(portName, this.inputPortMap);
    }

    public String getOutputPortIdentifier(String portName) throws IOException {
        return this.getPortIdentifier(portName, this.outputPortMap);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String getPortIdentifier(String portName, Map<String, String> portMap) throws IOException {
        String identifier;
        this.remoteInfoReadLock.lock();
        try {
            identifier = portMap.get(portName);
        }
        finally {
            this.remoteInfoReadLock.unlock();
        }
        if (identifier != null) {
            return identifier;
        }
        this.refreshRemoteInfo();
        this.remoteInfoReadLock.lock();
        try {
            String string = portMap.get(portName);
            return string;
        }
        finally {
            this.remoteInfoReadLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ControllerDTO refreshRemoteInfo() throws IOException {
        boolean webInterfaceSecure = this.clusterUrl.toString().startsWith("https");
        NiFiRestApiUtil utils = new NiFiRestApiUtil(webInterfaceSecure ? this.sslContext : null);
        ControllerDTO controller = utils.getController(this.apiUri + "/controller", this.commsTimeout);
        this.remoteInfoWriteLock.lock();
        try {
            this.siteToSitePort = controller.getRemoteSiteListeningPort();
            this.siteToSiteSecure = controller.isSiteToSiteSecure();
            this.inputPortMap.clear();
            for (PortDTO inputPort : controller.getInputPorts()) {
                this.inputPortMap.put(inputPort.getName(), inputPort.getId());
            }
            this.outputPortMap.clear();
            for (PortDTO outputPort : controller.getOutputPorts()) {
                this.outputPortMap.put(outputPort.getName(), outputPort.getId());
            }
            this.remoteRefreshTime = System.currentTimeMillis();
        }
        finally {
            this.remoteInfoWriteLock.unlock();
        }
        return controller;
    }

    private Integer getSiteToSitePort() throws IOException {
        Integer listeningPort;
        this.remoteInfoReadLock.lock();
        try {
            listeningPort = this.siteToSitePort;
            if (listeningPort != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
                Integer n = listeningPort;
                return n;
            }
        }
        finally {
            this.remoteInfoReadLock.unlock();
        }
        ControllerDTO controller = this.refreshRemoteInfo();
        listeningPort = controller.getRemoteSiteListeningPort();
        return listeningPort;
    }

    public String toString() {
        return "EndpointConnectionPool[Cluster URL=" + this.clusterUrl + "]";
    }

    public boolean isSecure() throws IOException {
        this.remoteInfoReadLock.lock();
        try {
            Boolean secure = this.siteToSiteSecure;
            if (secure != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
                boolean bl = secure;
                return bl;
            }
        }
        finally {
            this.remoteInfoReadLock.unlock();
        }
        ControllerDTO controller = this.refreshRemoteInfo();
        Boolean isSecure = controller.isSiteToSiteSecure();
        if (isSecure == null) {
            throw new IOException("Remote NiFi instance " + this.clusterUrl + " is not currently configured to accept site-to-site connections");
        }
        return isSecure;
    }

    private class IdEnrichedRemoteDestination
    implements RemoteDestination {
        private final RemoteDestination original;
        private final String identifier;

        public IdEnrichedRemoteDestination(RemoteDestination original, String identifier) {
            this.original = original;
            this.identifier = identifier;
        }

        public String getIdentifier() {
            return this.identifier;
        }

        public String getName() {
            return this.original.getName();
        }

        public long getYieldPeriod(TimeUnit timeUnit) {
            return this.original.getYieldPeriod(timeUnit);
        }

        public boolean isUseCompression() {
            return this.original.isUseCompression();
        }
    }
}

