/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.impl.ObjectUtils;
import io.axoniq.axonserver.connector.impl.ReconnectConfiguration;
import io.axoniq.axonserver.connector.impl.ServerAddress;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.axoniq.axonserver.grpc.control.NodeInfo;
import io.axoniq.axonserver.grpc.control.PlatformInfo;
import io.axoniq.axonserver.grpc.control.PlatformServiceGrpc;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonServerManagedChannel
extends ManagedChannel {
    private static final Logger logger = LoggerFactory.getLogger(AxonServerManagedChannel.class);
    private final List<ServerAddress> routingServers;
    private final long reconnectInterval;
    private final String context;
    private final ClientIdentification clientIdentification;
    private final ScheduledExecutorService executor;
    private final boolean forcePlatformReconnect;
    private final BiFunction<ServerAddress, String, ManagedChannel> connectionFactory;
    private final long connectTimeout;
    private final AtomicReference<ManagedChannel> activeChannel = new AtomicReference();
    private final AtomicBoolean shutdown = new AtomicBoolean();
    private final AtomicBoolean suppressErrors = new AtomicBoolean();
    private final Queue<Runnable> connectListeners = new LinkedBlockingQueue<Runnable>();
    private final AtomicLong nextAttemptTime = new AtomicLong();
    private final AtomicReference<Exception> lastConnectException = new AtomicReference();

    public AxonServerManagedChannel(List<ServerAddress> routingServers, ReconnectConfiguration reconnectConfiguration, String context, ClientIdentification clientIdentification, ScheduledExecutorService executor, BiFunction<ServerAddress, String, ManagedChannel> connectionFactory) {
        this.routingServers = new ArrayList<ServerAddress>(routingServers);
        this.reconnectInterval = reconnectConfiguration.getTimeUnit().toMillis(reconnectConfiguration.getReconnectInterval());
        this.context = context;
        this.clientIdentification = clientIdentification;
        this.executor = executor;
        this.forcePlatformReconnect = reconnectConfiguration.isForcePlatformReconnect();
        this.connectionFactory = connectionFactory;
        this.connectTimeout = reconnectConfiguration.getTimeUnit().toMillis(reconnectConfiguration.getConnectTimeout());
        this.executor.schedule(() -> this.ensureConnected(true), 100L, TimeUnit.MILLISECONDS);
    }

    private ManagedChannel connectChannel() {
        ManagedChannel connection = null;
        for (ServerAddress nodeInfo : this.routingServers) {
            ManagedChannel candidate = this.connectionFactory.apply(nodeInfo, this.context);
            try {
                PlatformServiceGrpc.PlatformServiceBlockingStub stub = (PlatformServiceGrpc.PlatformServiceBlockingStub)PlatformServiceGrpc.newBlockingStub((Channel)candidate).withDeadlineAfter(this.connectTimeout, TimeUnit.MILLISECONDS);
                logger.info("Requesting connection details from {}:{}", (Object)nodeInfo.getHostName(), (Object)nodeInfo.getGrpcPort());
                PlatformInfo clusterInfo = stub.getPlatformServer(this.clientIdentification);
                NodeInfo primaryClusterInfo = clusterInfo.getPrimary();
                logger.debug("Received PlatformInfo suggesting [{}] ({}:{}), {}", new Object[]{primaryClusterInfo.getNodeName(), primaryClusterInfo.getHostName(), primaryClusterInfo.getGrpcPort(), clusterInfo.getSameConnection() ? "allowing use of existing connection" : "requiring new connection"});
                if (clusterInfo.getSameConnection() || primaryClusterInfo.getGrpcPort() == nodeInfo.getGrpcPort() && primaryClusterInfo.getHostName().equals(nodeInfo.getHostName())) {
                    logger.debug("Reusing existing channel");
                    connection = candidate;
                } else {
                    candidate.shutdown();
                    logger.info("Connecting to [{}] ({}:{})", new Object[]{primaryClusterInfo.getNodeName(), primaryClusterInfo.getHostName(), primaryClusterInfo.getGrpcPort()});
                    ServerAddress serverAddress = new ServerAddress(primaryClusterInfo.getHostName(), primaryClusterInfo.getGrpcPort());
                    connection = this.connectionFactory.apply(serverAddress, this.context);
                }
                this.suppressErrors.set(false);
                this.lastConnectException.set(null);
                break;
            }
            catch (StatusRuntimeException sre) {
                this.lastConnectException.set((Exception)((Object)sre));
                this.shutdownNow(candidate);
                if (!this.suppressErrors.getAndSet(true)) {
                    logger.warn("Connecting to AxonServer node [{}] failed.", (Object)nodeInfo, (Object)sre);
                    continue;
                }
                logger.warn("Connecting to AxonServer node [{}] failed: {}", (Object)nodeInfo, (Object)sre.getMessage());
            }
        }
        return connection;
    }

    private void shutdownNow(ManagedChannel managedChannel) {
        try {
            managedChannel.shutdownNow().awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.debug("Interrupted during shutdown");
        }
    }

    public ManagedChannel shutdown() {
        this.shutdown.set(true);
        ObjectUtils.doIfNotNull(this.activeChannel.get(), ManagedChannel::shutdown);
        return this;
    }

    public boolean isShutdown() {
        return this.shutdown.get();
    }

    public boolean isTerminated() {
        if (!this.shutdown.get()) {
            return false;
        }
        ManagedChannel current = this.activeChannel.get();
        return current == null || current.isTerminated();
    }

    public ManagedChannel shutdownNow() {
        this.shutdown.set(true);
        ObjectUtils.doIfNotNull(this.activeChannel.get(), ManagedChannel::shutdownNow);
        return this;
    }

    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        ManagedChannel current = this.activeChannel.get();
        if (current != null) {
            current.awaitTermination(timeout, unit);
        }
        return true;
    }

    public <REQ, RESP> ClientCall<REQ, RESP> newCall(MethodDescriptor<REQ, RESP> methodDescriptor, CallOptions callOptions) {
        ManagedChannel current = this.activeChannel.get();
        if (current == null || current.getState(false) != ConnectivityState.READY) {
            this.ensureConnected(false);
            current = this.activeChannel.get();
        }
        if (current == null) {
            return new FailingCall();
        }
        return current.newCall(methodDescriptor, callOptions);
    }

    public String authority() {
        return this.routingServers.get(0).toString();
    }

    public ConnectivityState getState(boolean requestConnection) {
        ManagedChannel current;
        if (this.shutdown.get()) {
            return ConnectivityState.SHUTDOWN;
        }
        if (requestConnection) {
            this.ensureConnected(false);
        }
        if ((current = this.activeChannel.get()) == null) {
            if (this.lastConnectException.get() == null) {
                return ConnectivityState.IDLE;
            }
            return ConnectivityState.TRANSIENT_FAILURE;
        }
        ConnectivityState state = current.getState(requestConnection);
        return state == ConnectivityState.SHUTDOWN ? ConnectivityState.TRANSIENT_FAILURE : state;
    }

    public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) {
        ManagedChannel current = this.activeChannel.get();
        logger.debug("Registering state change listener for {} on channel {}", (Object)source, (Object)current);
        switch (source) {
            case SHUTDOWN: 
            case READY: 
            case IDLE: 
            case CONNECTING: {
                if (current != null) {
                    current.notifyWhenStateChanged(source, callback);
                    break;
                }
                callback.run();
                break;
            }
            case TRANSIENT_FAILURE: {
                if (current == null) {
                    this.connectListeners.add(callback);
                    break;
                }
                callback.run();
            }
        }
    }

    public void resetConnectBackoff() {
        ObjectUtils.doIfNotNull(this.activeChannel.get(), ManagedChannel::resetConnectBackoff);
    }

    public void enterIdle() {
        ObjectUtils.doIfNotNull(this.activeChannel.get(), ManagedChannel::enterIdle);
    }

    private void ensureConnected(boolean allowReschedule) {
        ConnectivityState state;
        if (this.shutdown.get()) {
            return;
        }
        long now = System.currentTimeMillis();
        long deadline = this.nextAttemptTime.getAndUpdate(d -> d > now ? d : now + this.reconnectInterval);
        if (deadline > now) {
            if (allowReschedule) {
                long timeLeft = Math.min(500L, deadline - now);
                logger.debug("Reconnect timeout still enforced. Scheduling a new connection check in {}ms", (Object)timeLeft);
                this.scheduleConnectionCheck(timeLeft);
            }
            return;
        }
        logger.debug("Checking connection state");
        ManagedChannel current = this.activeChannel.get();
        ConnectivityState connectivityState = state = current == null ? ConnectivityState.SHUTDOWN : current.getState(true);
        if (state == ConnectivityState.TRANSIENT_FAILURE || state == ConnectivityState.SHUTDOWN) {
            if (state == ConnectivityState.TRANSIENT_FAILURE) {
                logger.info("Connection to AxonServer lost. Attempting to reconnect...");
            }
            this.createConnection(allowReschedule, current);
        } else if (allowReschedule && (state == ConnectivityState.IDLE || state == ConnectivityState.CONNECTING)) {
            logger.debug("Connection is {}, checking again in 50ms", (Object)state);
            this.scheduleConnectionCheck(50L);
        } else {
            logger.debug("Connection seems normal. {}", (Object)state);
            if (allowReschedule) {
                logger.debug("Registering state change handler");
                current.notifyWhenStateChanged(ConnectivityState.READY, this::verifyConnectionStateChange);
            }
        }
    }

    private void createConnection(boolean allowReschedule, ManagedChannel current) {
        ManagedChannel newConnection = null;
        try {
            if (this.forcePlatformReconnect && current != null && !current.isShutdown()) {
                logger.debug("Shut down current connection");
                current.shutdown();
            }
            if ((newConnection = this.connectChannel()) != null) {
                Runnable listener;
                if (!this.activeChannel.compareAndSet(current, newConnection)) {
                    logger.debug("A successful Connection was concurrently set up. Closing this one.");
                    newConnection.shutdown();
                    if (allowReschedule) {
                        this.scheduleConnectionCheck(50L);
                    }
                    return;
                }
                ObjectUtils.doIfNotNull(current, ManagedChannel::shutdown);
                if (logger.isInfoEnabled()) {
                    logger.info("Successfully connected to {}", (Object)newConnection.authority());
                }
                while ((listener = this.connectListeners.poll()) != null) {
                    listener.run();
                }
                this.nextAttemptTime.set(0L);
                if (allowReschedule) {
                    logger.debug("Registering state change handler");
                    newConnection.notifyWhenStateChanged(ConnectivityState.READY, this::verifyConnectionStateChange);
                }
            } else if (allowReschedule) {
                logger.info("Failed to get connection to AxonServer. Scheduling a reconnect in {}ms", (Object)this.reconnectInterval);
                this.scheduleConnectionCheck(this.reconnectInterval);
            }
        }
        catch (Exception e) {
            ObjectUtils.doIfNotNull(newConnection, ManagedChannel::shutdown);
            if (allowReschedule) {
                logger.info("Failed to get connection to AxonServer. Scheduling a reconnect in {}ms", (Object)this.reconnectInterval, (Object)e);
                this.scheduleConnectionCheck(this.reconnectInterval);
            }
            logger.debug("Failed to get connection to AxonServer.");
        }
    }

    private void verifyConnectionStateChange() {
        logger.debug("Connection state changed. Checking state now...");
        this.scheduleConnectionCheck(0L);
    }

    private void scheduleConnectionCheck(long interval) {
        try {
            this.executor.schedule(() -> this.ensureConnected(true), interval, TimeUnit.MILLISECONDS);
        }
        catch (RejectedExecutionException e) {
            logger.info("Did not schedule reconnect attempt. Connector is shut down");
        }
    }

    public void requestReconnect() {
        logger.info("Reconnect requested. Closing current connection");
        ObjectUtils.doIfNotNull(this.activeChannel.get(), c -> {
            c.shutdown();
            this.executor.schedule(() -> ((ManagedChannel)c).shutdownNow(), 5L, TimeUnit.SECONDS);
        });
    }

    public void forceReconnect() {
        logger.info("Forceful reconnect required. Closing current connection");
        ManagedChannel currentChannel = this.activeChannel.get();
        if (currentChannel != null) {
            currentChannel.shutdown();
            try {
                currentChannel.awaitTermination(5L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            currentChannel.shutdownNow();
        }
    }

    private static class FailingCall<REQ, RESP>
    extends ClientCall<REQ, RESP> {
        private FailingCall() {
        }

        public void start(ClientCall.Listener<RESP> responseListener, Metadata headers) {
            responseListener.onClose(Status.UNAVAILABLE, null);
        }

        public void request(int numMessages) {
        }

        public void cancel(@Nullable String message, @Nullable Throwable cause) {
        }

        public void halfClose() {
        }

        public void sendMessage(REQ message) {
            throw new StatusRuntimeException(Status.UNAVAILABLE);
        }
    }
}

