/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.AxonServerConnection;
import io.axoniq.axonserver.connector.command.CommandChannel;
import io.axoniq.axonserver.connector.command.impl.CommandChannelImpl;
import io.axoniq.axonserver.connector.control.ControlChannel;
import io.axoniq.axonserver.connector.event.EventChannel;
import io.axoniq.axonserver.connector.event.impl.EventChannelImpl;
import io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.ControlChannelImpl;
import io.axoniq.axonserver.connector.impl.ObjectUtils;
import io.axoniq.axonserver.connector.query.QueryChannel;
import io.axoniq.axonserver.connector.query.impl.QueryChannelImpl;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.grpc.ConnectivityState;
import io.grpc.StatusRuntimeException;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

public class ContextConnection
implements AxonServerConnection {
    private final ClientIdentification clientIdentification;
    private final ControlChannelImpl controlChannel;
    private final AtomicReference<CommandChannelImpl> commandChannel = new AtomicReference();
    private final AtomicReference<EventChannelImpl> eventChannel = new AtomicReference();
    private final AtomicReference<QueryChannelImpl> queryChannel = new AtomicReference();
    private final ScheduledExecutorService executorService;
    private final AxonServerManagedChannel connection;
    private final String context;

    public ContextConnection(ClientIdentification clientIdentification, ScheduledExecutorService executorService, AxonServerManagedChannel connection, long processorInfoUpdateFrequency, String context) {
        this.clientIdentification = clientIdentification;
        this.executorService = executorService;
        this.connection = connection;
        this.context = context;
        this.controlChannel = new ControlChannelImpl(clientIdentification, context, executorService, connection, processorInfoUpdateFrequency, this::reconnectChannels);
    }

    private void reconnectChannels() {
        this.connection.requestReconnect();
        ObjectUtils.doIfNotNull(this.commandChannel.get(), CommandChannelImpl::reconnect);
        ObjectUtils.doIfNotNull(this.queryChannel.get(), QueryChannelImpl::reconnect);
        ObjectUtils.doIfNotNull(this.controlChannel, ControlChannelImpl::reconnect);
        ObjectUtils.doIfNotNull(this.eventChannel.get(), EventChannelImpl::reconnect);
    }

    @Override
    public boolean isConnectionFailed() {
        return this.connection.getState(false) == ConnectivityState.TRANSIENT_FAILURE;
    }

    @Override
    public boolean isReady() {
        return this.isConnected() && Optional.ofNullable(this.commandChannel.get()).map(CommandChannelImpl::isReady).orElse(true) != false && Optional.ofNullable(this.queryChannel.get()).map(QueryChannelImpl::isReady).orElse(true) != false && Optional.ofNullable(this.eventChannel.get()).map(EventChannelImpl::isReady).orElse(true) != false && this.controlChannel.isReady();
    }

    @Override
    public boolean isConnected() {
        return this.connection.getState(false) == ConnectivityState.READY;
    }

    @Override
    public void disconnect() {
        ObjectUtils.doIfNotNull(this.controlChannel, ControlChannelImpl::disconnect);
        ObjectUtils.doIfNotNull(this.commandChannel.get(), CommandChannelImpl::disconnect);
        ObjectUtils.doIfNotNull(this.queryChannel.get(), QueryChannelImpl::disconnect);
        ObjectUtils.doIfNotNull(this.eventChannel.get(), EventChannelImpl::disconnect);
        this.connection.shutdown();
        try {
            if (!this.connection.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.connection.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            this.connection.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public ControlChannel controlChannel() {
        return this.controlChannel;
    }

    public void connect() {
        this.ensureConnected(this.controlChannel);
    }

    @Override
    public CommandChannel commandChannel() {
        CommandChannelImpl channel = this.commandChannel.updateAndGet(this.createIfNull(() -> new CommandChannelImpl(this.clientIdentification, this.context, 5000, 2000, this.executorService, this.connection)));
        return this.ensureConnected(channel);
    }

    @Override
    public EventChannel eventChannel() {
        EventChannelImpl channel = this.eventChannel.updateAndGet(this.createIfNull(() -> new EventChannelImpl(this.clientIdentification, this.executorService, this.connection)));
        return this.ensureConnected(channel);
    }

    @Override
    public QueryChannel queryChannel() {
        QueryChannelImpl channel = this.queryChannel.updateAndGet(this.createIfNull(() -> new QueryChannelImpl(this.clientIdentification, this.context, 5000, 2000, this.executorService, this.connection)));
        return this.ensureConnected(channel);
    }

    private <T> UnaryOperator<T> createIfNull(Supplier<T> factory) {
        return existing -> existing == null ? factory.get() : existing;
    }

    private <T extends AbstractAxonServerChannel> T ensureConnected(T channel) {
        if (!channel.isReady()) {
            ConnectivityState state = this.connection.getState(true);
            if (state != ConnectivityState.SHUTDOWN && state != ConnectivityState.TRANSIENT_FAILURE) {
                try {
                    channel.connect();
                }
                catch (StatusRuntimeException e) {
                    this.connection.notifyWhenStateChanged(state, channel::connect);
                }
            } else {
                this.connection.notifyWhenStateChanged(state, channel::connect);
            }
        }
        return channel;
    }

    public AxonServerManagedChannel getManagedChannel() {
        return this.connection;
    }
}

