/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.impl.spi.impl.listener;

import com.hazelcast.client.HazelcastClientNotActiveException;
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.connection.ClientConnection;
import com.hazelcast.client.impl.connection.ClientConnectionManager;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.spi.ClientListenerService;
import com.hazelcast.client.impl.spi.EventHandler;
import com.hazelcast.client.impl.spi.impl.ClientExecutionServiceImpl;
import com.hazelcast.client.impl.spi.impl.ClientInvocation;
import com.hazelcast.client.impl.spi.impl.ClientInvocationFuture;
import com.hazelcast.client.impl.spi.impl.ListenerMessageCodec;
import com.hazelcast.client.impl.spi.impl.listener.ClientConnectionRegistration;
import com.hazelcast.client.impl.spi.impl.listener.ClientListenerRegistration;
import com.hazelcast.client.properties.ClientProperty;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.metrics.StaticMetricsProvider;
import com.hazelcast.internal.nio.Connection;
import com.hazelcast.internal.nio.ConnectionListener;
import com.hazelcast.internal.util.EmptyStatement;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.internal.util.UuidUtil;
import com.hazelcast.internal.util.executor.SingleExecutorThreadFactory;
import com.hazelcast.internal.util.executor.StripedExecutor;
import com.hazelcast.internal.util.executor.StripedRunnable;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.exception.TargetDisconnectedException;
import com.hazelcast.spi.properties.HazelcastProperties;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class ClientListenerServiceImpl
implements ClientListenerService,
StaticMetricsProvider,
ConnectionListener {
    private final HazelcastClientInstanceImpl client;
    private final Map<UUID, ClientListenerRegistration> registrations = new ConcurrentHashMap<UUID, ClientListenerRegistration>();
    private final ClientConnectionManager clientConnectionManager;
    private final ILogger logger;
    private final ExecutorService registrationExecutor;
    private final StripedExecutor eventExecutor;
    private final boolean isSmart;

    public ClientListenerServiceImpl(HazelcastClientInstanceImpl client) {
        this.client = client;
        this.isSmart = client.getClientConfig().getNetworkConfig().isSmartRouting();
        this.logger = client.getLoggingService().getLogger(ClientListenerService.class);
        String name = client.getName();
        HazelcastProperties properties = client.getProperties();
        int eventQueueCapacity = properties.getInteger(ClientProperty.EVENT_QUEUE_CAPACITY);
        int eventThreadCount = properties.getInteger(ClientProperty.EVENT_THREAD_COUNT);
        this.eventExecutor = new StripedExecutor(this.logger, name + ".event", eventThreadCount, eventQueueCapacity, true);
        ClassLoader classLoader = client.getClientConfig().getClassLoader();
        SingleExecutorThreadFactory threadFactory = new SingleExecutorThreadFactory(classLoader, name + ".eventRegistration-");
        this.registrationExecutor = Executors.newSingleThreadExecutor(threadFactory);
        this.clientConnectionManager = client.getConnectionManager();
    }

    @Override
    @Nonnull
    public UUID registerListener(ListenerMessageCodec codec, EventHandler handler) {
        assert (!Thread.currentThread().getName().contains("eventRegistration"));
        Future<UUID> future = this.registrationExecutor.submit(() -> {
            UUID userRegistrationId = UuidUtil.newUnsecureUUID();
            ClientListenerRegistration registration = new ClientListenerRegistration(handler, codec);
            this.registrations.put(userRegistrationId, registration);
            Collection<ClientConnection> connections = this.clientConnectionManager.getActiveConnections();
            for (ClientConnection connection : connections) {
                try {
                    this.invoke(registration, connection);
                }
                catch (Exception e) {
                    if (!connection.isAlive()) continue;
                    this.deregisterListenerInternal(userRegistrationId);
                    throw new HazelcastException("Listener can not be added ", e);
                }
            }
            return userRegistrationId;
        });
        try {
            return future.get();
        }
        catch (Exception e) {
            throw ExceptionUtil.rethrow(e);
        }
    }

    @Override
    public boolean deregisterListener(@Nullable UUID userRegistrationId) {
        assert (!Thread.currentThread().getName().contains("eventRegistration"));
        Preconditions.checkNotNull(userRegistrationId, "Null userRegistrationId is not allowed!");
        try {
            Future<Boolean> future = this.registrationExecutor.submit(() -> this.deregisterListenerInternal(userRegistrationId));
            try {
                return future.get();
            }
            catch (Exception e) {
                throw ExceptionUtil.rethrow(e);
            }
        }
        catch (RejectedExecutionException ignored) {
            EmptyStatement.ignore(ignored);
            return true;
        }
    }

    @Override
    public void provideStaticMetrics(MetricsRegistry registry) {
        registry.registerStaticMetrics(this, "listeners");
    }

    @Probe(name="eventQueueSize", level=ProbeLevel.MANDATORY)
    private int eventQueueSize() {
        return this.eventExecutor.getWorkQueueSize();
    }

    @Probe(name="eventsProcessed", level=ProbeLevel.MANDATORY)
    private long eventsProcessed() {
        return this.eventExecutor.processedCount();
    }

    public void handleEventMessage(ClientMessage clientMessage) {
        Runnable eventProcessor = clientMessage.getPartitionId() == -1 ? () -> this.handleEventMessageOnCallingThread(clientMessage) : new ClientEventProcessor(clientMessage);
        try {
            this.eventExecutor.execute(eventProcessor);
        }
        catch (RejectedExecutionException e) {
            this.logger.warning("Event clientMessage could not be handled", e);
        }
    }

    public void handleEventMessageOnCallingThread(ClientMessage clientMessage) {
        long correlationId = clientMessage.getCorrelationId();
        ClientConnection connection = (ClientConnection)clientMessage.getConnection();
        EventHandler eventHandler = connection.getEventHandler(correlationId);
        if (eventHandler == null) {
            if (this.logger.isFineEnabled()) {
                this.logger.fine("No eventHandler for callId: " + correlationId + ", event: " + clientMessage);
            }
            return;
        }
        eventHandler.handle(clientMessage);
    }

    protected void invoke(ClientListenerRegistration listenerRegistration, Connection connection) throws Exception {
        ClientMessage clientMessage;
        assert (Thread.currentThread().getName().contains("eventRegistration"));
        if (listenerRegistration.getConnectionRegistrations().containsKey(connection)) {
            return;
        }
        ListenerMessageCodec codec = listenerRegistration.getCodec();
        ClientMessage request = codec.encodeAddRequest(this.registersLocalOnly());
        EventHandler handler = listenerRegistration.getHandler();
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Register attempt of " + listenerRegistration + " to " + connection);
        }
        handler.beforeListenerRegister(connection);
        ClientInvocation invocation = new ClientInvocation(this.client, request, null, connection);
        invocation.setEventHandler(handler);
        ClientInvocationFuture future = invocation.invokeUrgent();
        try {
            clientMessage = (ClientMessage)future.get();
        }
        catch (Exception e) {
            throw ExceptionUtil.rethrow((Throwable)e, Exception.class);
        }
        UUID serverRegistrationId = codec.decodeAddResponse(clientMessage);
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Registered " + listenerRegistration + " to " + connection);
        }
        handler.onListenerRegister(connection);
        long correlationId = request.getCorrelationId();
        ClientConnectionRegistration registration = new ClientConnectionRegistration(serverRegistrationId, correlationId);
        listenerRegistration.getConnectionRegistrations().put(connection, registration);
    }

    public void connectionAdded(Connection connection) {
        assert (!Thread.currentThread().getName().contains("eventRegistration"));
        this.registrationExecutor.submit(() -> {
            for (ClientListenerRegistration listenerRegistration : this.registrations.values()) {
                this.invokeFromInternalThread(listenerRegistration, connection);
            }
        });
    }

    public void shutdown() {
        this.eventExecutor.shutdown();
        this.registrationExecutor.shutdown();
        ClientExecutionServiceImpl.awaitExecutorTermination("registrationExecutor", this.registrationExecutor, this.logger);
    }

    public void start() {
        this.clientConnectionManager.addConnectionListener(this);
    }

    public void connectionRemoved(Connection connection) {
        assert (!Thread.currentThread().getName().contains("eventRegistration"));
        this.registrationExecutor.submit(() -> {
            for (ClientListenerRegistration registry : this.registrations.values()) {
                registry.getConnectionRegistrations().remove(connection);
            }
        });
    }

    public StripedExecutor getEventExecutor() {
        return this.eventExecutor;
    }

    public Map<Connection, ClientConnectionRegistration> getActiveRegistrations(UUID uuid) {
        assert (!Thread.currentThread().getName().contains("eventRegistration"));
        Future<Map> future = this.registrationExecutor.submit(() -> {
            ClientListenerRegistration listenerRegistration = this.registrations.get(uuid);
            if (listenerRegistration == null) {
                return Collections.emptyMap();
            }
            return listenerRegistration.getConnectionRegistrations();
        });
        try {
            return future.get();
        }
        catch (Exception e) {
            throw ExceptionUtil.rethrow(e);
        }
    }

    public Map<UUID, ClientListenerRegistration> getRegistrations() {
        return Collections.unmodifiableMap(this.registrations);
    }

    private void invokeFromInternalThread(ClientListenerRegistration registrationKey, Connection connection) {
        assert (Thread.currentThread().getName().contains("eventRegistration"));
        try {
            this.invoke(registrationKey, connection);
        }
        catch (Exception e) {
            this.logger.warning("Listener " + registrationKey + " can not be added to a new connection: " + connection + ", reason: " + e.getMessage());
        }
    }

    private boolean registersLocalOnly() {
        return this.isSmart;
    }

    private Boolean deregisterListenerInternal(@Nullable UUID userRegistrationId) {
        assert (Thread.currentThread().getName().contains("eventRegistration"));
        ClientListenerRegistration listenerRegistration = this.registrations.remove(userRegistrationId);
        if (listenerRegistration == null) {
            return false;
        }
        Map<Connection, ClientConnectionRegistration> registrations = listenerRegistration.getConnectionRegistrations();
        for (Map.Entry<Connection, ClientConnectionRegistration> entry : registrations.entrySet()) {
            ClientConnectionRegistration registration = entry.getValue();
            ClientConnection subscriber = (ClientConnection)entry.getKey();
            subscriber.removeEventHandler(registration.getCallId());
            ListenerMessageCodec listenerMessageCodec = listenerRegistration.getCodec();
            UUID serverRegistrationId = registration.getServerRegistrationId();
            ClientMessage request = listenerMessageCodec.encodeRemoveRequest(serverRegistrationId);
            if (request == null) continue;
            ClientInvocation clientInvocation = new ClientInvocation(this.client, request, null, subscriber);
            clientInvocation.setInvocationTimeoutMillis(Long.MAX_VALUE);
            clientInvocation.invokeUrgent().exceptionally(throwable -> {
                if (!(throwable instanceof HazelcastClientNotActiveException || throwable instanceof IOException || throwable instanceof TargetDisconnectedException)) {
                    this.logger.warning("Deregistration of listener with ID " + userRegistrationId + " has failed for address " + subscriber.getRemoteAddress(), (Throwable)throwable);
                }
                return null;
            });
        }
        return true;
    }

    private final class ClientEventProcessor
    implements StripedRunnable {
        final ClientMessage clientMessage;

        private ClientEventProcessor(ClientMessage clientMessage) {
            this.clientMessage = clientMessage;
        }

        @Override
        public void run() {
            ClientListenerServiceImpl.this.handleEventMessageOnCallingThread(this.clientMessage);
        }

        @Override
        public int getKey() {
            return this.clientMessage.getPartitionId();
        }
    }
}

