/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.spi.impl.listener;

import com.hazelcast.client.HazelcastClientNotActiveException;
import com.hazelcast.client.connection.ClientConnectionManager;
import com.hazelcast.client.connection.nio.ClientConnection;
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.spi.ClientListenerService;
import com.hazelcast.client.spi.EventHandler;
import com.hazelcast.client.spi.impl.AbstractClientInvocationService;
import com.hazelcast.client.spi.impl.ClientExecutionServiceImpl;
import com.hazelcast.client.spi.impl.ClientInvocation;
import com.hazelcast.client.spi.impl.ClientInvocationFuture;
import com.hazelcast.client.spi.impl.ListenerMessageCodec;
import com.hazelcast.client.spi.impl.listener.ClientEventRegistration;
import com.hazelcast.client.spi.impl.listener.ClientRegistrationKey;
import com.hazelcast.client.spi.properties.ClientProperty;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.internal.metrics.MetricsProvider;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Connection;
import com.hazelcast.nio.ConnectionListener;
import com.hazelcast.spi.exception.TargetDisconnectedException;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.util.EmptyStatement;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.UuidUtil;
import com.hazelcast.util.executor.SingleExecutorThreadFactory;
import com.hazelcast.util.executor.StripedExecutor;
import com.hazelcast.util.executor.StripedRunnable;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

public abstract class AbstractClientListenerService
implements ClientListenerService,
MetricsProvider,
ConnectionListener {
    protected final HazelcastClientInstanceImpl client;
    protected final SerializationService serializationService;
    protected final long invocationTimeoutMillis;
    protected final long invocationRetryPauseMillis;
    protected final Map<ClientRegistrationKey, Map<Connection, ClientEventRegistration>> registrations = new ConcurrentHashMap<ClientRegistrationKey, Map<Connection, ClientEventRegistration>>();
    final ScheduledExecutorService registrationExecutor;
    final ClientConnectionManager clientConnectionManager;
    private final ILogger logger;
    @Probe(name="eventHandlerCount", level=ProbeLevel.MANDATORY)
    private final ConcurrentMap<Long, EventHandler> eventHandlerMap = new ConcurrentHashMap<Long, EventHandler>();
    private final StripedExecutor eventExecutor;

    AbstractClientListenerService(HazelcastClientInstanceImpl client) {
        this.client = client;
        this.serializationService = client.getSerializationService();
        this.logger = client.getLoggingService().getLogger(ClientListenerService.class);
        String name = client.getName();
        HazelcastProperties properties = client.getProperties();
        int eventQueueCapacity = properties.getInteger(ClientProperty.EVENT_QUEUE_CAPACITY);
        int eventThreadCount = properties.getInteger(ClientProperty.EVENT_THREAD_COUNT);
        this.eventExecutor = new StripedExecutor(this.logger, name + ".event", eventThreadCount, eventQueueCapacity, true);
        ClassLoader classLoader = client.getClientConfig().getClassLoader();
        SingleExecutorThreadFactory threadFactory = new SingleExecutorThreadFactory(classLoader, name + ".eventRegistration-");
        this.registrationExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)threadFactory);
        this.clientConnectionManager = client.getConnectionManager();
        AbstractClientInvocationService invocationService = (AbstractClientInvocationService)client.getInvocationService();
        this.invocationTimeoutMillis = invocationService.getInvocationTimeoutMillis();
        this.invocationRetryPauseMillis = invocationService.getInvocationRetryPauseMillis();
    }

    @Override
    public String registerListener(final ListenerMessageCodec codec, final EventHandler handler) {
        assert (!Thread.currentThread().getName().contains("eventRegistration"));
        Future<String> future = this.registrationExecutor.submit(new Callable<String>(){

            @Override
            public String call() {
                String userRegistrationId = UuidUtil.newUnsecureUuidString();
                ClientRegistrationKey registrationKey = new ClientRegistrationKey(userRegistrationId, handler, codec);
                AbstractClientListenerService.this.registrations.put(registrationKey, new ConcurrentHashMap());
                Collection<ClientConnection> connections = AbstractClientListenerService.this.clientConnectionManager.getActiveConnections();
                for (ClientConnection connection : connections) {
                    try {
                        AbstractClientListenerService.this.invoke(registrationKey, connection);
                    }
                    catch (Exception e) {
                        if (!connection.isAlive()) continue;
                        AbstractClientListenerService.this.deregisterListenerInternal(userRegistrationId);
                        throw new HazelcastException("Listener can not be added ", (Throwable)e);
                    }
                }
                return userRegistrationId;
            }
        });
        try {
            return future.get();
        }
        catch (Exception e) {
            throw ExceptionUtil.rethrow((Throwable)e);
        }
    }

    @Override
    public boolean deregisterListener(final String userRegistrationId) {
        assert (!Thread.currentThread().getName().contains("eventRegistration"));
        try {
            Future<Boolean> future = this.registrationExecutor.submit(new Callable<Boolean>(){

                @Override
                public Boolean call() {
                    return AbstractClientListenerService.this.deregisterListenerInternal(userRegistrationId);
                }
            });
            try {
                return future.get();
            }
            catch (Exception e) {
                throw ExceptionUtil.rethrow((Throwable)e);
            }
        }
        catch (RejectedExecutionException ignored) {
            EmptyStatement.ignore((Throwable)ignored);
            return true;
        }
    }

    public void provideMetrics(MetricsRegistry registry) {
        registry.scanAndRegister((Object)this, "listeners");
    }

    @Probe(level=ProbeLevel.MANDATORY)
    private int eventQueueSize() {
        return this.eventExecutor.getWorkQueueSize();
    }

    @Probe(level=ProbeLevel.MANDATORY)
    private long eventsProcessed() {
        return this.eventExecutor.processedCount();
    }

    public void addEventHandler(long callId, EventHandler handler) {
        this.eventHandlerMap.put(callId, handler);
    }

    public void handleClientMessage(ClientMessage clientMessage) {
        try {
            this.eventExecutor.execute((Runnable)((Object)new ClientEventProcessor(clientMessage)));
        }
        catch (RejectedExecutionException e) {
            this.logger.warning("Event clientMessage could not be handled", (Throwable)e);
        }
    }

    protected void invoke(ClientRegistrationKey registrationKey, Connection connection) throws Exception {
        ClientMessage clientMessage;
        assert (Thread.currentThread().getName().contains("eventRegistration"));
        Map<Connection, ClientEventRegistration> registrationMap = this.registrations.get(registrationKey);
        if (registrationMap.containsKey(connection)) {
            return;
        }
        ListenerMessageCodec codec = registrationKey.getCodec();
        ClientMessage request = codec.encodeAddRequest(this.registersLocalOnly());
        EventHandler handler = registrationKey.getHandler();
        handler.beforeListenerRegister();
        ClientInvocation invocation = new ClientInvocation(this.client, request, null, connection);
        invocation.setEventHandler(handler);
        ClientInvocationFuture future = invocation.invokeUrgent();
        try {
            clientMessage = (ClientMessage)future.get();
        }
        catch (Exception e) {
            throw ExceptionUtil.rethrow((Throwable)e, Exception.class);
        }
        String serverRegistrationId = codec.decodeAddResponse(clientMessage);
        handler.onListenerRegister();
        long correlationId = request.getCorrelationId();
        ClientEventRegistration registration = new ClientEventRegistration(serverRegistrationId, correlationId, connection, codec);
        registrationMap.put(connection, registration);
    }

    public void connectionAdded(final Connection connection) {
        assert (!Thread.currentThread().getName().contains("eventRegistration"));
        this.registrationExecutor.submit(new Runnable(){

            @Override
            public void run() {
                for (ClientRegistrationKey registrationKey : AbstractClientListenerService.this.registrations.keySet()) {
                    AbstractClientListenerService.this.invokeFromInternalThread(registrationKey, connection);
                }
            }
        });
    }

    public void shutdown() {
        this.eventExecutor.shutdown();
        ClientExecutionServiceImpl.shutdownExecutor("registrationExecutor", this.registrationExecutor, this.logger);
    }

    public void start() {
        this.clientConnectionManager.addConnectionListener(this);
    }

    public void connectionRemoved(final Connection connection) {
        assert (!Thread.currentThread().getName().contains("eventRegistration"));
        this.registrationExecutor.submit(new Runnable(){

            @Override
            public void run() {
                for (Map<Connection, ClientEventRegistration> registrationMap : AbstractClientListenerService.this.registrations.values()) {
                    ClientEventRegistration registration = registrationMap.remove(connection);
                    if (registration == null) continue;
                    AbstractClientListenerService.this.removeEventHandler(registration.getCallId());
                }
            }
        });
    }

    public StripedExecutor getEventExecutor() {
        return this.eventExecutor;
    }

    public Collection<ClientEventRegistration> getActiveRegistrations(final String uuid) {
        assert (!Thread.currentThread().getName().contains("eventRegistration"));
        Future<Collection<ClientEventRegistration>> future = this.registrationExecutor.submit(new Callable<Collection<ClientEventRegistration>>(){

            @Override
            public Collection<ClientEventRegistration> call() {
                ClientRegistrationKey key = new ClientRegistrationKey(uuid);
                Map<Connection, ClientEventRegistration> registrationMap = AbstractClientListenerService.this.registrations.get(key);
                if (registrationMap == null) {
                    return Collections.EMPTY_LIST;
                }
                LinkedList<ClientEventRegistration> activeRegistrations = new LinkedList<ClientEventRegistration>();
                for (ClientEventRegistration registration : registrationMap.values()) {
                    activeRegistrations.add(registration);
                }
                return activeRegistrations;
            }
        });
        try {
            return future.get();
        }
        catch (Exception e) {
            throw ExceptionUtil.rethrow((Throwable)e);
        }
    }

    public Map<ClientRegistrationKey, Map<Connection, ClientEventRegistration>> getRegistrations() {
        return Collections.unmodifiableMap(this.registrations);
    }

    public Map<Long, EventHandler> getEventHandlers() {
        return Collections.unmodifiableMap(this.eventHandlerMap);
    }

    private void invokeFromInternalThread(ClientRegistrationKey registrationKey, Connection connection) {
        assert (Thread.currentThread().getName().contains("eventRegistration"));
        try {
            this.invoke(registrationKey, connection);
        }
        catch (Exception e) {
            this.logger.warning("Listener " + registrationKey + " can not be added to a new connection: " + connection + ", reason: " + e.getMessage());
        }
    }

    abstract boolean registersLocalOnly();

    public void removeEventHandler(long callId) {
        this.eventHandlerMap.remove(callId);
    }

    private Boolean deregisterListenerInternal(final String userRegistrationId) {
        assert (Thread.currentThread().getName().contains("eventRegistration"));
        ClientRegistrationKey key = new ClientRegistrationKey(userRegistrationId);
        Map<Connection, ClientEventRegistration> registrationMap = this.registrations.remove(key);
        if (registrationMap == null) {
            return false;
        }
        for (ClientEventRegistration registration : registrationMap.values()) {
            final Connection subscriber = registration.getSubscriber();
            this.removeEventHandler(registration.getCallId());
            ListenerMessageCodec listenerMessageCodec = registration.getCodec();
            String serverRegistrationId = registration.getServerRegistrationId();
            ClientMessage request = listenerMessageCodec.encodeRemoveRequest(serverRegistrationId);
            ClientInvocation invocation = new ClientInvocation(this.client, request, null, subscriber);
            invocation.setInvocationTimeoutMillis(Long.MAX_VALUE);
            invocation.invokeUrgent().andThen(new ExecutionCallback<ClientMessage>(){

                public void onResponse(ClientMessage response) {
                }

                public void onFailure(Throwable throwable) {
                    if (!(throwable instanceof HazelcastClientNotActiveException || throwable instanceof IOException || throwable instanceof TargetDisconnectedException)) {
                        AbstractClientListenerService.this.logger.warning("Deregistration of listener with ID " + userRegistrationId + " has failed for address " + subscriber.getEndPoint(), throwable);
                    }
                }
            });
        }
        return true;
    }

    private final class ClientEventProcessor
    implements StripedRunnable {
        final ClientMessage clientMessage;

        private ClientEventProcessor(ClientMessage clientMessage) {
            this.clientMessage = clientMessage;
        }

        public void run() {
            long correlationId = this.clientMessage.getCorrelationId();
            EventHandler eventHandler = (EventHandler)AbstractClientListenerService.this.eventHandlerMap.get(correlationId);
            if (eventHandler == null) {
                if (AbstractClientListenerService.this.logger.isFineEnabled()) {
                    AbstractClientListenerService.this.logger.fine("No eventHandler for callId: " + correlationId + ", event: " + this.clientMessage);
                }
                return;
            }
            eventHandler.handle(this.clientMessage);
        }

        public int getKey() {
            return this.clientMessage.getPartitionId();
        }
    }
}

