/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.grpc.client.common.topics;

import com.google.protobuf.Any;
import com.google.protobuf.Int32Value;
import com.google.protobuf.Message;
import com.google.protobuf.StringValue;
import com.oracle.coherence.common.base.Exceptions;
import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.common.base.Timeout;
import com.oracle.coherence.grpc.BinaryHelper;
import com.oracle.coherence.grpc.TopicHelper;
import com.oracle.coherence.grpc.client.common.GrpcConnection;
import com.oracle.coherence.grpc.client.common.GrpcRemoteService;
import com.oracle.coherence.grpc.client.common.topics.GrpcNamedTopicConnector;
import com.oracle.coherence.grpc.client.common.topics.GrpcPublisherConnector;
import com.oracle.coherence.grpc.client.common.topics.GrpcSubscriberConnector;
import com.oracle.coherence.grpc.client.common.topics.GrpcTopicLifecycleEventDispatcher;
import com.oracle.coherence.grpc.client.common.topics.TopicServiceGrpcConnection;
import com.oracle.coherence.grpc.messages.common.v1.CollectionOfStringValues;
import com.oracle.coherence.grpc.messages.topic.v1.EnsureChannelCountRequest;
import com.oracle.coherence.grpc.messages.topic.v1.EnsurePublisherRequest;
import com.oracle.coherence.grpc.messages.topic.v1.EnsurePublisherResponse;
import com.oracle.coherence.grpc.messages.topic.v1.EnsureSubscriberRequest;
import com.oracle.coherence.grpc.messages.topic.v1.EnsureSubscriberResponse;
import com.oracle.coherence.grpc.messages.topic.v1.TopicServiceRequest;
import com.oracle.coherence.grpc.messages.topic.v1.TopicServiceRequestType;
import com.oracle.coherence.grpc.messages.topic.v1.TopicServiceResponse;
import com.tangosol.coherence.component.net.memberSet.actualMemberSet.ServiceMemberSet;
import com.tangosol.internal.net.grpc.RemoteGrpcTopicServiceDependencies;
import com.tangosol.internal.net.topic.NamedTopicConnector;
import com.tangosol.internal.net.topic.NamedTopicSubscriber;
import com.tangosol.internal.net.topic.NamedTopicView;
import com.tangosol.internal.net.topic.PublisherConnector;
import com.tangosol.internal.net.topic.SubscriberConnector;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberGroupId;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberId;
import com.tangosol.io.Serializer;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.RequestIncompleteException;
import com.tangosol.net.RequestTimeoutException;
import com.tangosol.net.TopicService;
import com.tangosol.net.events.EventDispatcher;
import com.tangosol.net.events.EventDispatcherRegistry;
import com.tangosol.net.internal.ScopedTopicReferenceStore;
import com.tangosol.net.topic.NamedTopic;
import com.tangosol.net.topic.NamedTopicEvent;
import com.tangosol.net.topic.NamedTopicListener;
import com.tangosol.net.topic.Publisher;
import com.tangosol.net.topic.Subscriber;
import com.tangosol.net.topic.TopicBackingMapManager;
import com.tangosol.util.ResourceRegistry;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class GrpcRemoteTopicService
extends GrpcRemoteService<RemoteGrpcTopicServiceDependencies>
implements TopicService {
    private TopicServiceGrpcConnection m_connection;
    private TopicBackingMapManager<?, ?> m_backingMapManager;
    private final ScopedTopicReferenceStore f_store = new ScopedTopicReferenceStore();
    private final Listener f_listener = new Listener();

    public GrpcRemoteTopicService() {
        super("RemoteGrpcTopic");
    }

    @Override
    protected Class<? extends Message> getResponseType() {
        return TopicServiceResponse.class;
    }

    public TopicBackingMapManager getTopicBackingMapManager() {
        return this.m_backingMapManager;
    }

    public void setTopicBackingMapManager(TopicBackingMapManager manager) {
        this.m_backingMapManager = manager;
    }

    @Override
    protected EventDispatcherRegistry getDefaultEventDispatcherRegistry() {
        ResourceRegistry registry = this.m_backingMapManager.getCacheFactory().getResourceRegistry();
        return (EventDispatcherRegistry)registry.getResource(EventDispatcherRegistry.class);
    }

    @Override
    public void start() {
        super.start();
        GrpcConnection grpcConnection = this.connect("TopicService", 1, 1, 1);
        this.m_connection = new TopicServiceGrpcConnection(grpcConnection);
    }

    @Override
    protected void stopInternal() {
        for (NamedTopic topic : this.f_store.getAll()) {
            topic.removeListener((NamedTopicListener)this.f_listener);
            try {
                topic.release();
            }
            catch (Throwable e) {
                Logger.err((Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T> NamedTopic<T> ensureTopic(String sName, ClassLoader ignored) {
        ClassLoader loader = this.getContextClassLoader();
        if (loader == null) {
            throw new IllegalStateException("ContextClassLoader is missing");
        }
        NamedTopic<T> topic = (NamedTopic<T>)this.f_store.get(sName, loader);
        if (topic == null || !topic.isActive()) {
            long cWait = ((RemoteGrpcTopicServiceDependencies)this.getDependencies()).getRequestTimeoutMillis();
            if (cWait <= 0L) {
                cWait = -1L;
            }
            if (!this.f_store.lock(sName, cWait)) {
                throw new RequestTimeoutException("Failed to get a reference to topic '" + sName + "' after " + cWait + "ms");
            }
            try {
                topic = (NamedTopic)this.f_store.get(sName, loader);
                if (topic == null || !topic.isActive()) {
                    topic = this.ensureNamedTopicClient(sName);
                    this.f_store.put(topic, loader);
                }
            }
            finally {
                this.f_store.unlock(sName);
            }
        }
        return topic;
    }

    public void releaseTopic(NamedTopic<?> topic) {
        GrpcNamedTopicConnector<?> channel = this.assertTopicType(topic);
        this.releaseTopic(topic, channel, false);
    }

    public void destroyTopic(NamedTopic<?> topic) {
        GrpcNamedTopicConnector<?> channel = this.assertTopicType(topic);
        this.releaseTopic(topic, channel, true);
    }

    public int getChannelCount(String sTopicName) {
        StringValue topicName = StringValue.of((String)sTopicName);
        Int32Value value = (Int32Value)((CompletableFuture)this.m_connection.poll(0, TopicServiceRequestType.GetChannelCount, (Message)topicName).thenApply(this.m_connection::unpackInteger)).join();
        return value.getValue();
    }

    public int ensureChannelCount(String sTopicName, int cRequired, int cChannel) {
        EnsureChannelCountRequest request = EnsureChannelCountRequest.newBuilder().setTopic(sTopicName).setRequiredCount(cRequired).setChannelCount(cChannel).build();
        Int32Value value = (Int32Value)((CompletableFuture)this.m_connection.poll(0, TopicServiceRequestType.EnsureChannelCount, (Message)request).thenApply(this.m_connection::unpackInteger)).join();
        return value.getValue();
    }

    public Set<SubscriberGroupId> getSubscriberGroups(String sTopicName) {
        StringValue topicName = StringValue.of((String)sTopicName);
        CollectionOfStringValues values = (CollectionOfStringValues)((CompletableFuture)this.m_connection.poll(0, TopicServiceRequestType.GetSubscriberGroups, (Message)topicName).thenApply(resp -> this.m_connection.unpackMessage((TopicServiceResponse)resp, CollectionOfStringValues.class))).join();
        return values.getValuesList().stream().map(SubscriberGroupId::withName).collect(Collectors.toSet());
    }

    public Set<String> getTopicNames() {
        return this.f_store.getNames();
    }

    public boolean isVersionCompatible(int nMajor, int nMinor, int nMicro, int nPatchSet, int nPatch) {
        int nEncoded = ServiceMemberSet.encodeVersion((int)nMajor, (int)nMinor, (int)nMicro, (int)nPatchSet, (int)nPatch);
        return CacheFactory.VERSION_ENCODED >= nEncoded;
    }

    public boolean isVersionCompatible(int nYear, int nMonth, int nPatch) {
        int nEncoded = ServiceMemberSet.encodeVersion((int)nYear, (int)nMonth, (int)nPatch);
        return CacheFactory.VERSION_ENCODED >= nEncoded;
    }

    public boolean isVersionCompatible(int nVersion) {
        return CacheFactory.VERSION_ENCODED >= nVersion;
    }

    public boolean isVersionCompatible(IntPredicate predicate) {
        return predicate.test(CacheFactory.VERSION_ENCODED);
    }

    public int getMinimumServiceVersion() {
        return CacheFactory.VERSION_ENCODED;
    }

    protected <T> NamedTopic<T> ensureNamedTopicClient(String sName) {
        GrpcTopicLifecycleEventDispatcher dispatcher = new GrpcTopicLifecycleEventDispatcher(sName, this);
        Channel channel = this.m_tracingInterceptor == null ? this.m_channel : ClientInterceptors.intercept((Channel)this.m_channel, (ClientInterceptor[])new ClientInterceptor[]{this.m_tracingInterceptor});
        GrpcNamedTopicConnector.Dependencies dependencies = this.createTopicDependencies(sName, channel, dispatcher);
        GrpcConnection grpcConnection = this.connect("TopicService", 1, 1, 1);
        TopicServiceGrpcConnection connection = new TopicServiceGrpcConnection(grpcConnection);
        GrpcNamedTopicConnector connector = new GrpcNamedTopicConnector(dependencies, this, connection);
        NamedTopicView topic = new NamedTopicView(connector);
        EventDispatcherRegistry dispatcherReg = this.getEventDispatcherRegistry();
        if (dispatcherReg != null) {
            dispatcherReg.registerEventDispatcher((EventDispatcher)dispatcher);
        }
        topic.addListener((NamedTopicListener)this.f_listener);
        this.m_executor.execute(() -> dispatcher.dispatchTopicCreated((NamedTopic<?>)topic));
        return topic;
    }

    private GrpcNamedTopicConnector.Dependencies createTopicDependencies(String sName, Channel channel, GrpcTopicLifecycleEventDispatcher dispatcher) {
        RemoteGrpcTopicServiceDependencies dependencies = (RemoteGrpcTopicServiceDependencies)this.getDependencies();
        String sScopeName = dependencies.getRemoteScopeName();
        if (sScopeName == null) {
            sScopeName = "";
        }
        GrpcNamedTopicConnector.DefaultDependencies deps = new GrpcNamedTopicConnector.DefaultDependencies(sName, channel, dispatcher);
        deps.setScope(sScopeName);
        deps.setSerializer(this.m_serializer, this.m_serializer.getName());
        deps.setExecutor((Executor)this.m_executor);
        deps.setDeferKeyAssociationCheck(dependencies.isDeferKeyAssociationCheck());
        deps.setDeadline(dependencies.getDeadline());
        deps.setHeartbeatMillis(dependencies.getHeartbeatInterval());
        deps.setRequireHeartbeatAck(dependencies.isRequireHeartbeatAck());
        return deps;
    }

    protected void releaseTopic(NamedTopic<?> topic, GrpcNamedTopicConnector<?> channel, boolean fDestroy) {
        if (fDestroy) {
            channel.destroy();
        } else {
            channel.release();
        }
        channel.close();
        this.f_store.release(topic);
    }

    protected void dispatchDestroyedLifecycleEvent(NamedTopic<?> topic) {
        GrpcNamedTopicConnector<?> channel = this.unwrapChannel(topic);
        if (channel != null) {
            GrpcTopicLifecycleEventDispatcher dispatcher = (GrpcTopicLifecycleEventDispatcher)((GrpcNamedTopicConnector.Dependencies)channel.getDependencies()).getEventDispatcher();
            EventDispatcherRegistry dispatcherReg = this.getEventDispatcherRegistry();
            if (dispatcherReg != null) {
                dispatcherReg.unregisterEventDispatcher((EventDispatcher)dispatcher);
            }
            this.m_executor.execute(() -> dispatcher.dispatchTopicDestroyed(topic));
        }
    }

    private GrpcNamedTopicConnector<?> assertTopicType(NamedTopic<?> topic) {
        if (!(topic instanceof NamedTopicView)) {
            throw new IllegalArgumentException("illegal topic: " + String.valueOf(topic));
        }
        NamedTopicConnector connector = ((NamedTopicView)topic).getConnector();
        if (!(connector instanceof GrpcNamedTopicConnector)) {
            throw new IllegalArgumentException("illegal topic: " + String.valueOf(topic));
        }
        return (GrpcNamedTopicConnector)connector;
    }

    private GrpcNamedTopicConnector<?> unwrapChannel(NamedTopic<?> topic) {
        if (!(topic instanceof NamedTopicView)) {
            return null;
        }
        NamedTopicConnector connector = ((NamedTopicView)topic).getConnector();
        if (!(connector instanceof GrpcNamedTopicConnector)) {
            return null;
        }
        return (GrpcNamedTopicConnector)connector;
    }

    public TopicServiceGrpcConnection createPublisherConnection() {
        GrpcConnection grpcConnection = this.connect("TopicService", 1, 1, 1);
        return new TopicServiceGrpcConnection(grpcConnection);
    }

    public <V> PublisherConnector<V> ensurePublisher(String sTopicName, Publisher.Option<? super V>[] options) {
        long nMillis = ((RemoteGrpcTopicServiceDependencies)this.getDependencies()).getRequestTimeoutMillis();
        Timeout ignored = Timeout.after((long)nMillis);
        while (true) {
            GrpcPublisherConnector<? super V> grpcPublisherConnector;
            block13: {
                try {
                    Publisher.OptionSet optionSet = Publisher.optionsFrom(options);
                    TopicServiceGrpcConnection connection = this.createPublisherConnection();
                    EnsurePublisherRequest ensureRequest = EnsurePublisherRequest.newBuilder().setTopic(sTopicName).setChannelCount(optionSet.getChannelCount(0)).build();
                    TopicServiceRequest request = TopicServiceRequest.newBuilder().setType(TopicServiceRequestType.EnsurePublisher).setMessage(Any.pack((Message)ensureRequest)).build();
                    TopicServiceResponse response = (TopicServiceResponse)connection.send((Message)request);
                    EnsurePublisherResponse ensureResponse = connection.unpackMessage(response, EnsurePublisherResponse.class);
                    int nProxyId = ensureResponse.getProxyId();
                    long nPublisherId = ensureResponse.getPublisherId();
                    int cChannel = ensureResponse.getChannelCount();
                    GrpcPublisherConnector<? super V> connector = new GrpcPublisherConnector<V>(connection, nProxyId, nPublisherId, cChannel, options);
                    connector.setTopicName(sTopicName);
                    connector.setMaxBatchSizeBytes(ensureResponse.getMaxBatchSize());
                    connector.setTopicService(this);
                    grpcPublisherConnector = connector;
                    if (ignored == null) break block13;
                }
                catch (Throwable t) {
                    try {
                        try {
                            Throwable rootCause = Exceptions.getRootCause((Throwable)t);
                            if (rootCause instanceof StatusRuntimeException && ((StatusRuntimeException)rootCause).getStatus().getCode() == Status.Code.UNAVAILABLE) {
                                Logger.finer((String)"Caught Status.UNAVAILABLE exception ensuring publisher, will retry");
                                continue;
                            }
                            if (rootCause instanceof TimeoutException) {
                                Logger.finer((String)"Caught TimeoutException ensuring publisher, will retry");
                                continue;
                            }
                            throw Exceptions.ensureRuntimeException((Throwable)t);
                        }
                        catch (Throwable throwable) {
                            if (ignored != null) {
                                try {
                                    ignored.close();
                                }
                                catch (Throwable throwable2) {
                                    throwable.addSuppressed(throwable2);
                                }
                            }
                            throw throwable;
                        }
                    }
                    catch (InterruptedException e) {
                        throw new RequestIncompleteException("Timed out after " + nMillis + " ms attempting to create a publisher", (Throwable)e);
                    }
                }
                ignored.close();
            }
            return grpcPublisherConnector;
        }
    }

    public TopicServiceGrpcConnection createSubscriberConnection() {
        GrpcConnection grpcConnection = this.connect("TopicService", 1, 1, 1);
        return new TopicServiceGrpcConnection(grpcConnection);
    }

    public <V, U> SubscriberConnector<U> ensureSubscriber(GrpcNamedTopicConnector<?> connector, Subscriber.Option<? super V, U>[] options) {
        String sTopicName = connector.getName();
        TopicServiceGrpcConnection connection = this.createSubscriberConnection();
        EnsureSubscriberRequest.Builder builder = EnsureSubscriberRequest.newBuilder();
        NamedTopicSubscriber.OptionSet optionSet = NamedTopicSubscriber.optionsFrom(options);
        optionSet.getSubscriberGroupName().ifPresent(arg_0 -> ((EnsureSubscriberRequest.Builder)builder).setSubscriberGroup(arg_0));
        optionSet.getFilter().ifPresent(f -> builder.setFilter(BinaryHelper.toByteString((Object)f, (Serializer)this.m_serializer)));
        optionSet.getExtractor().ifPresent(e -> builder.setExtractor(BinaryHelper.toByteString((Object)e, (Serializer)this.m_serializer)));
        int[] anChannel = optionSet.getSubscribeTo();
        if (anChannel != null && anChannel.length > 0) {
            builder.addAllChannels((Iterable)IntStream.of(anChannel).boxed().collect(Collectors.toList()));
        }
        EnsureSubscriberRequest request = builder.setTopic(sTopicName).build();
        TopicServiceResponse response = connection.send(0, TopicServiceRequestType.EnsureSubscriber, (Message)request);
        EnsureSubscriberResponse ensureResponse = connection.unpackMessage(response, EnsureSubscriberResponse.class);
        SubscriberId subscriberId = TopicHelper.fromProtobufSubscriberId((EnsureSubscriberResponse)ensureResponse);
        SubscriberGroupId groupId = TopicHelper.fromProtobufSubscriberGroupId((EnsureSubscriberResponse)ensureResponse);
        int proxyId = ensureResponse.getProxyId();
        return new GrpcSubscriberConnector(connector, proxyId, connection, sTopicName, subscriberId, groupId);
    }

    protected class Listener
    implements NamedTopicListener {
        protected Listener() {
        }

        public void onEvent(NamedTopicEvent evt) {
            NamedTopic topic;
            if (evt.getType() == NamedTopicEvent.Type.Destroyed && GrpcRemoteTopicService.this.f_store.releaseTopic(topic = evt.getSource())) {
                GrpcRemoteTopicService.this.dispatchDestroyedLifecycleEvent(topic);
            }
        }
    }
}

