/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub.v1;

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.ClientStream;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.pubsub.v1.MessageDispatcher;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.StatusUtil;
import com.google.cloud.pubsub.v1.Waiter;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import io.grpc.Status;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

final class StreamingSubscriberConnection
extends AbstractApiService
implements MessageDispatcher.AckProcessor {
    private static final Logger logger = Logger.getLogger(StreamingSubscriberConnection.class.getName());
    private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis((long)100L);
    private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds((long)10L);
    private static final int MAX_PER_REQUEST_CHANGES = 1000;
    private final SubscriberStub stub;
    private final int channelAffinity;
    private final String subscription;
    private final ScheduledExecutorService systemExecutor;
    private final MessageDispatcher messageDispatcher;
    private final FlowControlSettings flowControlSettings;
    private final boolean useLegacyFlowControl;
    private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
    private final Waiter ackOperationsWaiter = new Waiter();
    private final Lock lock = new ReentrantLock();
    private ClientStream<StreamingPullRequest> clientStream;
    private final String clientId = UUID.randomUUID().toString();

    public StreamingSubscriberConnection(String subscription, MessageReceiver receiver, Duration ackExpirationPadding, Duration maxAckExtensionPeriod, Duration maxDurationPerAckExtension, Distribution ackLatencyDistribution, SubscriberStub stub, int channelAffinity, FlowControlSettings flowControlSettings, boolean useLegacyFlowControl, FlowController flowController, ScheduledExecutorService executor, ScheduledExecutorService systemExecutor, ApiClock clock) {
        this.subscription = subscription;
        this.systemExecutor = systemExecutor;
        this.stub = stub;
        this.channelAffinity = channelAffinity;
        this.messageDispatcher = new MessageDispatcher(receiver, this, ackExpirationPadding, maxAckExtensionPeriod, maxDurationPerAckExtension, ackLatencyDistribution, flowController, executor, systemExecutor, clock);
        this.flowControlSettings = flowControlSettings;
        this.useLegacyFlowControl = useLegacyFlowControl;
    }

    protected void doStart() {
        logger.config("Starting subscriber.");
        this.messageDispatcher.start();
        this.initialize();
        this.notifyStarted();
    }

    protected void doStop() {
        this.messageDispatcher.stop();
        this.ackOperationsWaiter.waitComplete();
        this.lock.lock();
        try {
            this.clientStream.closeSendWithError((Throwable)Status.CANCELLED.asException());
        }
        finally {
            this.lock.unlock();
            this.notifyStopped();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initialize() {
        SettableApiFuture errorFuture = SettableApiFuture.create();
        StreamingPullResponseObserver responseObserver = new StreamingPullResponseObserver((SettableApiFuture<Void>)errorFuture);
        ClientStream initClientStream = this.stub.streamingPullCallable().splitCall((ResponseObserver)responseObserver, (ApiCallContext)GrpcCallContext.createDefault().withChannelAffinity(Integer.valueOf(this.channelAffinity)));
        logger.log(Level.FINER, "Initializing stream to subscription {0}", this.subscription);
        initClientStream.send((Object)StreamingPullRequest.newBuilder().setSubscription(this.subscription).setStreamAckDeadlineSeconds(60).setClientId(this.clientId).setMaxOutstandingMessages(this.useLegacyFlowControl ? 0L : this.valueOrZero(this.flowControlSettings.getMaxOutstandingElementCount())).setMaxOutstandingBytes(this.useLegacyFlowControl ? 0L : this.valueOrZero(this.flowControlSettings.getMaxOutstandingRequestBytes())).build());
        this.lock.lock();
        try {
            this.clientStream = initClientStream;
        }
        finally {
            this.lock.unlock();
        }
        ApiFutures.addCallback((ApiFuture)errorFuture, (ApiFutureCallback)new ApiFutureCallback<Void>(){

            public void onSuccess(@Nullable Void result) {
                if (!StreamingSubscriberConnection.this.isAlive()) {
                    return;
                }
                StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
                StreamingSubscriberConnection.this.initialize();
            }

            public void onFailure(Throwable cause) {
                if (!StreamingSubscriberConnection.this.isAlive()) {
                    logger.log(Level.FINE, "pull failure after service no longer running", cause);
                    return;
                }
                if (!StatusUtil.isRetryable(cause)) {
                    ApiException gaxException = ApiExceptionFactory.createException((Throwable)cause, (StatusCode)GrpcStatusCode.of((Status.Code)Status.fromThrowable((Throwable)cause).getCode()), (boolean)false);
                    logger.log(Level.SEVERE, "terminated streaming with exception", (Throwable)gaxException);
                    StreamingSubscriberConnection.this.notifyFailed((Throwable)gaxException);
                    return;
                }
                logger.log(Level.FINE, "stream closed with retryable exception; will reconnect", cause);
                long backoffMillis = StreamingSubscriberConnection.this.channelReconnectBackoffMillis.get();
                long newBackoffMillis = Math.min(backoffMillis * 2L, MAX_CHANNEL_RECONNECT_BACKOFF.toMillis());
                StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(newBackoffMillis);
                StreamingSubscriberConnection.this.systemExecutor.schedule(new Runnable(){

                    @Override
                    public void run() {
                        StreamingSubscriberConnection.this.initialize();
                    }
                }, backoffMillis, TimeUnit.MILLISECONDS);
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    private Long valueOrZero(Long value) {
        return value != null ? value : 0L;
    }

    private boolean isAlive() {
        ApiService.State state = this.state();
        return state == ApiService.State.RUNNING || state == ApiService.State.STARTING;
    }

    @Override
    public void sendAckOperations(List<String> acksToSend, List<MessageDispatcher.PendingModifyAckDeadline> ackDeadlineExtensions) {
        ApiFutureCallback<Empty> loggingCallback = new ApiFutureCallback<Empty>(){

            public void onSuccess(Empty empty) {
                StreamingSubscriberConnection.this.ackOperationsWaiter.incrementPendingCount(-1);
            }

            public void onFailure(Throwable t) {
                StreamingSubscriberConnection.this.ackOperationsWaiter.incrementPendingCount(-1);
                Level level = StreamingSubscriberConnection.this.isAlive() ? Level.WARNING : Level.FINER;
                logger.log(level, "failed to send operations", t);
            }
        };
        int pendingOperations = 0;
        for (MessageDispatcher.PendingModifyAckDeadline modack : ackDeadlineExtensions) {
            for (List idChunk : Lists.partition(modack.ackIds, (int)1000)) {
                ApiFuture future = this.stub.modifyAckDeadlineCallable().futureCall((Object)ModifyAckDeadlineRequest.newBuilder().setSubscription(this.subscription).addAllAckIds((Iterable)idChunk).setAckDeadlineSeconds(modack.deadlineExtensionSeconds).build());
                ApiFutures.addCallback((ApiFuture)future, (ApiFutureCallback)loggingCallback, (Executor)MoreExecutors.directExecutor());
                ++pendingOperations;
            }
        }
        for (List idChunk : Lists.partition(acksToSend, (int)1000)) {
            ApiFuture future = this.stub.acknowledgeCallable().futureCall((Object)AcknowledgeRequest.newBuilder().setSubscription(this.subscription).addAllAckIds((Iterable)idChunk).build());
            ApiFutures.addCallback((ApiFuture)future, (ApiFutureCallback)loggingCallback, (Executor)MoreExecutors.directExecutor());
            ++pendingOperations;
        }
        this.ackOperationsWaiter.incrementPendingCount(pendingOperations);
    }

    private class StreamingPullResponseObserver
    implements ResponseObserver<StreamingPullResponse> {
        final SettableApiFuture<Void> errorFuture;
        StreamController thisController;

        StreamingPullResponseObserver(SettableApiFuture<Void> errorFuture) {
            this.errorFuture = errorFuture;
        }

        public void onStart(StreamController controller) {
            this.thisController = controller;
            this.thisController.disableAutoInboundFlowControl();
            this.thisController.request(1);
        }

        public void onResponse(StreamingPullResponse response) {
            StreamingSubscriberConnection.this.channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
            StreamingSubscriberConnection.this.messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());
            if (StreamingSubscriberConnection.this.isAlive() && !this.errorFuture.isDone()) {
                StreamingSubscriberConnection.this.lock.lock();
                try {
                    this.thisController.request(1);
                }
                catch (Exception e) {
                    logger.log(Level.WARNING, "cannot request more messages", e);
                }
                finally {
                    StreamingSubscriberConnection.this.lock.unlock();
                }
            }
        }

        public void onError(Throwable t) {
            this.errorFuture.setException(t);
        }

        public void onComplete() {
            logger.fine("Streaming pull terminated successfully!");
            this.errorFuture.set(null);
        }
    }
}

