/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.NettyClient;
import org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PartitionRequestClientFactory {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClientFactory.class);
    private final NettyClient nettyClient;
    private final int retryNumber;
    private final ConcurrentMap<ConnectionID, CompletableFuture<NettyPartitionRequestClient>> clients = new ConcurrentHashMap<ConnectionID, CompletableFuture<NettyPartitionRequestClient>>();
    private final boolean connectionReuseEnabled;

    PartitionRequestClientFactory(NettyClient nettyClient, boolean connectionReuseEnabled) {
        this(nettyClient, 0, connectionReuseEnabled);
    }

    PartitionRequestClientFactory(NettyClient nettyClient, int retryNumber, boolean connectionReuseEnabled) {
        this.nettyClient = nettyClient;
        this.retryNumber = retryNumber;
        this.connectionReuseEnabled = connectionReuseEnabled;
    }

    NettyPartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException {
        connectionId = new ConnectionID(connectionId.getResourceID(), connectionId.getAddress(), 0);
        while (true) {
            NettyPartitionRequestClient client;
            CompletableFuture<NettyPartitionRequestClient> newClientFuture;
            CompletableFuture clientFuture;
            if ((clientFuture = this.clients.putIfAbsent(connectionId, newClientFuture = new CompletableFuture<NettyPartitionRequestClient>())) == null) {
                try {
                    client = this.connectWithRetries(connectionId);
                }
                catch (Throwable e) {
                    newClientFuture.completeExceptionally(new IOException("Could not create Netty client.", e));
                    this.clients.remove(connectionId, newClientFuture);
                    throw e;
                }
                newClientFuture.complete(client);
            } else {
                try {
                    client = (NettyPartitionRequestClient)clientFuture.get();
                }
                catch (ExecutionException e) {
                    ExceptionUtils.rethrowIOException(ExceptionUtils.stripExecutionException(e));
                    return null;
                }
            }
            if (client.validateClientAndIncrementReferenceCounter()) {
                return client;
            }
            if (client.canBeDisposed()) {
                client.closeConnection();
                continue;
            }
            this.destroyPartitionRequestClient(connectionId, client);
        }
    }

    public boolean isConnectionReuseEnabled() {
        return this.connectionReuseEnabled;
    }

    private NettyPartitionRequestClient connectWithRetries(ConnectionID connectionId) throws InterruptedException, RemoteTransportException {
        int tried = 0;
        while (true) {
            try {
                return this.connect(connectionId);
            }
            catch (RemoteTransportException e) {
                if (++tried > this.retryNumber) {
                    LOG.warn("Failed to connect to {}. Giving up.", (Object)connectionId.getAddress(), (Object)e);
                    throw e;
                }
                LOG.warn("Failed {} times to connect to {}. Retrying.", new Object[]{tried, connectionId.getAddress(), e});
                continue;
            }
            break;
        }
    }

    private NettyPartitionRequestClient connect(ConnectionID connectionId) throws RemoteTransportException, InterruptedException {
        try {
            Channel channel = this.nettyClient.connect(connectionId.getAddress()).sync().channel();
            NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class);
            return new NettyPartitionRequestClient(channel, clientHandler, connectionId, this);
        }
        catch (InterruptedException e) {
            throw e;
        }
        catch (Exception e) {
            throw new RemoteTransportException("Connecting to remote task manager '" + String.valueOf(connectionId.getAddress()) + " [ " + connectionId.getResourceID().getStringWithMetadata() + " ] ' has failed. This might indicate that the remote task manager has been lost.", connectionId.getAddress(), e);
        }
    }

    void closeOpenChannelConnections(ConnectionID connectionId) {
        CompletableFuture entry = (CompletableFuture)this.clients.get(connectionId);
        if (entry != null && !entry.isDone()) {
            entry.thenAccept(client -> {
                if (client.canBeDisposed()) {
                    this.clients.remove(connectionId, entry);
                }
            });
        }
    }

    int getNumberOfActiveClients() {
        return this.clients.size();
    }

    void destroyPartitionRequestClient(ConnectionID connectionId, PartitionRequestClient client) {
        CompletableFuture future = (CompletableFuture)this.clients.get(connectionId);
        if (future != null && future.isDone()) {
            future.thenAccept(futureClient -> {
                if (client.equals(futureClient)) {
                    this.clients.remove(connectionId, future);
                }
            });
        }
    }
}

