/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.grpc.client.common.topics;

import com.google.protobuf.Int32Value;
import com.google.protobuf.Message;
import com.oracle.coherence.grpc.BinaryHelper;
import com.oracle.coherence.grpc.TopicHelper;
import com.oracle.coherence.grpc.client.common.GrpcConnection;
import com.oracle.coherence.grpc.client.common.SimpleStreamObserver;
import com.oracle.coherence.grpc.client.common.topics.TopicServiceGrpcConnection;
import com.oracle.coherence.grpc.messages.topic.v1.PublishRequest;
import com.oracle.coherence.grpc.messages.topic.v1.PublishResult;
import com.oracle.coherence.grpc.messages.topic.v1.TopicServiceRequestType;
import com.oracle.coherence.grpc.messages.topic.v1.TopicServiceResponse;
import com.tangosol.internal.net.topic.BaseRemotePublisher;
import com.tangosol.internal.net.topic.NamedTopicPublisher;
import com.tangosol.internal.net.topic.PublisherChannelConnector;
import com.tangosol.internal.net.topic.PublisherConnector;
import com.tangosol.io.Serializer;
import com.tangosol.net.topic.Publisher;
import com.tangosol.util.Binary;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public class GrpcPublisherConnector<V>
extends BaseRemotePublisher<V>
implements PublisherConnector<V>,
GrpcConnection.ConnectionListener {
    private final TopicServiceGrpcConnection f_connection;
    private final int f_nProxyId;
    private final GrpcConnection.Listener<TopicServiceResponse> f_listener;

    public GrpcPublisherConnector(TopicServiceGrpcConnection connection, int nProxyId, long nId, int cChannel, Publisher.Option<? super V>[] options) {
        super(nId, cChannel, options);
        this.f_connection = connection;
        this.f_nProxyId = nProxyId;
        SimpleStreamObserver<TopicServiceResponse> eventObserver = new SimpleStreamObserver<TopicServiceResponse>(this::onEvent);
        this.f_listener = new GrpcConnection.Listener<TopicServiceResponse>(eventObserver, r -> r.getProxyId() == nProxyId);
        connection.addResponseObserver(this.f_listener);
        connection.addConnectionListener(this);
    }

    public boolean isActive() {
        return this.f_connection.isConnected();
    }

    public void close() {
        this.f_connection.send(0, TopicServiceRequestType.DestroyPublisher, (Message)Int32Value.of((int)this.f_nProxyId));
        this.f_connection.close();
        super.close();
        if (this.f_listener != null) {
            this.f_connection.removeResponseObserver(this.f_listener);
        }
        this.f_connection.removeConnectionListener(this);
    }

    public void ensureConnected() {
    }

    public PublisherChannelConnector<V> createChannelConnector(int nChannel) {
        return new ChannelConnector(this.getId(), nChannel);
    }

    @Override
    public void onConnectionEvent(GrpcConnection.ConnectionEvent event) {
        if (event.getType() == GrpcConnection.ConnectionEvent.Type.Disconnected) {
            this.dispatchEvent(new NamedTopicPublisher.PublisherEvent((PublisherConnector)this, NamedTopicPublisher.PublisherEvent.Type.Disconnected));
        }
    }

    protected void onEvent(TopicServiceResponse response) {
        this.dispatchEvent(TopicHelper.fromProtobufPublisherEvent((PublisherConnector)this, (TopicServiceResponse)response));
    }

    protected class ChannelConnector
    extends BaseRemotePublisher.BaseChannelConnector {
        public ChannelConnector(long nId, int nChannel) {
            super((BaseRemotePublisher)GrpcPublisherConnector.this, nId, nChannel);
        }

        protected CompletionStage<com.tangosol.internal.net.topic.PublishResult> offerInternal(List<Binary> listBinary, int nNotifyPostFull) {
            PublishRequest request = PublishRequest.newBuilder().setChannel(this.f_nChannel).setNotificationIdentifier(nNotifyPostFull).addAllValues((Iterable)BinaryHelper.toListOfByteString(listBinary)).build();
            CompletableFuture<TopicServiceResponse> future = GrpcPublisherConnector.this.f_connection.poll(GrpcPublisherConnector.this.f_nProxyId, TopicServiceRequestType.Publish, (Message)request);
            future.join();
            return future.thenApply(response -> {
                PublishResult result = GrpcPublisherConnector.this.f_connection.unpackMessage((TopicServiceResponse)response, PublishResult.class);
                return TopicHelper.fromProtoBufPublishResult((PublishResult)result, (Serializer)this.getTopicService().getSerializer());
            });
        }
    }
}

