/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub.spi.v1;

import com.google.api.gax.core.ApiClock;
import com.google.api.gax.core.CurrentMillisClock;
import com.google.api.gax.core.FlowControlSettings;
import com.google.api.gax.core.FlowController;
import com.google.api.gax.grpc.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
import com.google.api.stats.Distribution;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
import com.google.cloud.pubsub.spi.v1.PollingSubscriberConnection;
import com.google.cloud.pubsub.spi.v1.StreamingSubscriberConnection;
import com.google.cloud.pubsub.spi.v1.SubscriptionAdminSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import com.google.pubsub.v1.SubscriptionName;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;

public class Subscriber {
    private static final int THREADS_PER_CHANNEL = 5;
    @VisibleForTesting
    static final int CHANNELS_PER_CORE = 10;
    private static final int MAX_INBOUND_MESSAGE_SIZE = 0x1400000;
    private static final int INITIAL_ACK_DEADLINE_SECONDS = 10;
    private static final int MAX_ACK_DEADLINE_SECONDS = 600;
    static final int MIN_ACK_DEADLINE_SECONDS = 10;
    private static final Duration ACK_DEADLINE_UPDATE_PERIOD = Duration.standardMinutes((long)1L);
    private static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9;
    private final SubscriberImpl impl;

    private Subscriber(Builder builder) throws IOException {
        this.impl = new SubscriberImpl(builder);
    }

    public static Builder newBuilder(SubscriptionName subscription, MessageReceiver receiver) {
        return new Builder(subscription, receiver);
    }

    public SubscriptionName getSubscriptionName() {
        return this.impl.subscriptionName;
    }

    public Duration getAckExpirationPadding() {
        return this.impl.ackExpirationPadding;
    }

    public FlowControlSettings getFlowControlSettings() {
        return this.impl.flowControlSettings;
    }

    public void addListener(final SubscriberListener listener, Executor executor) {
        this.impl.addListener(new Service.Listener(){

            public void failed(Service.State from, Throwable failure) {
                listener.failed(Subscriber.this.convertState(from), failure);
            }

            public void running() {
                listener.running();
            }

            public void starting() {
                listener.starting();
            }

            public void stopping(Service.State from) {
                listener.stopping(Subscriber.this.convertState(from));
            }

            public void terminated(Service.State from) {
                listener.terminated(Subscriber.this.convertState(from));
            }
        }, executor);
    }

    public void awaitRunning() {
        this.impl.awaitRunning();
    }

    public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
        this.impl.awaitRunning(timeout, unit);
    }

    public void awaitTerminated() {
        this.impl.awaitTerminated();
    }

    public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
        this.impl.awaitTerminated(timeout, unit);
    }

    public Throwable failureCause() {
        return this.impl.failureCause();
    }

    public boolean isRunning() {
        return this.impl.isRunning();
    }

    public Subscriber startAsync() {
        this.impl.startAsync();
        return this;
    }

    public State state() {
        return this.convertState(this.impl.state());
    }

    private State convertState(Service.State state) {
        switch (state) {
            case FAILED: {
                return State.FAILED;
            }
            case NEW: {
                return State.NEW;
            }
            case RUNNING: {
                return State.RUNNING;
            }
            case STARTING: {
                return State.STARTING;
            }
            case STOPPING: {
                return State.STOPPING;
            }
            case TERMINATED: {
                return State.TERMINATED;
            }
        }
        throw new IllegalStateException("unknown state: " + state);
    }

    public Subscriber stopAsync() {
        this.impl.stopAsync();
        return this;
    }

    public static final class Builder {
        private static final Duration MIN_ACK_EXPIRATION_PADDING = Duration.millis((long)100L);
        private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.millis((long)500L);
        static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(50 * Runtime.getRuntime().availableProcessors()).build();
        SubscriptionName subscriptionName;
        Optional<Credentials> credentials = Optional.absent();
        MessageReceiver receiver;
        Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING;
        FlowControlSettings flowControlSettings = FlowControlSettings.getDefaultInstance();
        ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
        Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder = Optional.absent();
        Optional<ApiClock> clock = Optional.absent();

        Builder(SubscriptionName subscriptionName, MessageReceiver receiver) {
            this.subscriptionName = subscriptionName;
            this.receiver = receiver;
        }

        public Builder setCredentials(Credentials credentials) {
            this.credentials = Optional.of((Object)Preconditions.checkNotNull((Object)credentials));
            return this;
        }

        public Builder setChannelBuilder(ManagedChannelBuilder<? extends ManagedChannelBuilder<?>> channelBuilder) {
            this.channelBuilder = Optional.of((Object)Preconditions.checkNotNull(channelBuilder));
            return this;
        }

        public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
            this.flowControlSettings = (FlowControlSettings)Preconditions.checkNotNull((Object)flowControlSettings);
            return this;
        }

        public Builder setAckExpirationPadding(Duration ackExpirationPadding) {
            Preconditions.checkArgument((ackExpirationPadding.compareTo((ReadableDuration)MIN_ACK_EXPIRATION_PADDING) >= 0 ? 1 : 0) != 0);
            this.ackExpirationPadding = ackExpirationPadding;
            return this;
        }

        public Builder setExecutorProvider(ExecutorProvider executorProvider) {
            this.executorProvider = (ExecutorProvider)Preconditions.checkNotNull((Object)executorProvider);
            return this;
        }

        Builder setClock(ApiClock clock) {
            this.clock = Optional.of((Object)clock);
            return this;
        }

        public Subscriber build() throws IOException {
            return new Subscriber(this);
        }
    }

    private static class SubscriberImpl
    extends AbstractService {
        private static final Logger logger = Logger.getLogger(Subscriber.class.getName());
        private final SubscriptionName subscriptionName;
        private final String cachedSubscriptionNameString;
        private final FlowControlSettings flowControlSettings;
        private final Duration ackExpirationPadding;
        private final ScheduledExecutorService executor;
        private final Distribution ackLatencyDistribution = new Distribution(601);
        private final int numChannels;
        private final FlowController flowController;
        private final ManagedChannelBuilder<? extends ManagedChannelBuilder<?>> channelBuilder;
        private final Credentials credentials;
        private final MessageReceiver receiver;
        private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
        private final List<PollingSubscriberConnection> pollingSubscriberConnections;
        private final ApiClock clock;
        private final List<AutoCloseable> closeables = new ArrayList<AutoCloseable>();
        private ScheduledFuture<?> ackDeadlineUpdater;
        private int streamAckDeadlineSeconds;

        private SubscriberImpl(Builder builder) throws IOException {
            this.receiver = builder.receiver;
            this.flowControlSettings = builder.flowControlSettings;
            this.subscriptionName = builder.subscriptionName;
            this.cachedSubscriptionNameString = this.subscriptionName.toString();
            this.ackExpirationPadding = builder.ackExpirationPadding;
            this.streamAckDeadlineSeconds = Math.max(10, Ints.saturatedCast((long)this.ackExpirationPadding.getStandardSeconds()));
            this.clock = builder.clock.isPresent() ? (ApiClock)builder.clock.get() : CurrentMillisClock.getDefaultClock();
            this.flowController = new FlowController(builder.flowControlSettings);
            this.executor = builder.executorProvider.getExecutor();
            if (builder.executorProvider.shouldAutoClose()) {
                this.closeables.add(new AutoCloseable(){

                    @Override
                    public void close() throws IOException {
                        SubscriberImpl.this.executor.shutdown();
                    }
                });
            }
            this.channelBuilder = builder.channelBuilder.isPresent() ? (ManagedChannelBuilder)builder.channelBuilder.get() : NettyChannelBuilder.forAddress((String)SubscriptionAdminSettings.getDefaultServiceAddress(), (int)SubscriptionAdminSettings.getDefaultServicePort()).maxMessageSize(0x1400000).flowControlWindow(5000000).negotiationType(NegotiationType.TLS).sslContext(GrpcSslContexts.forClient().ciphers(null).build()).executor((Executor)this.executor);
            this.credentials = builder.credentials.isPresent() ? (Credentials)builder.credentials.get() : GoogleCredentials.getApplicationDefault().createScoped(SubscriptionAdminSettings.getDefaultServiceScopes());
            this.numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * 10;
            this.streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(this.numChannels);
            this.pollingSubscriberConnections = new ArrayList<PollingSubscriberConnection>(this.numChannels);
        }

        protected void doStart() {
            logger.log(Level.FINE, "Starting subscriber group.");
            this.startPollingConnections();
            this.notifyStarted();
        }

        protected void doStop() {
            this.stopAllStreamingConnections();
            this.stopAllPollingConnections();
            try {
                for (AutoCloseable closeable : this.closeables) {
                    closeable.close();
                }
                this.notifyStopped();
            }
            catch (Exception e) {
                this.notifyFailed(e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void startStreamingConnections() {
            List<StreamingSubscriberConnection> list = this.streamingSubscriberConnections;
            synchronized (list) {
                for (int i = 0; i < this.numChannels; ++i) {
                    this.streamingSubscriberConnections.add(new StreamingSubscriberConnection(this.cachedSubscriptionNameString, this.credentials, this.receiver, this.ackExpirationPadding, this.streamAckDeadlineSeconds, this.ackLatencyDistribution, (Channel)this.channelBuilder.build(), this.flowController, this.executor, this.clock));
                }
                this.startConnections(this.streamingSubscriberConnections, new Service.Listener(){

                    public void failed(Service.State from, Throwable failure) {
                        SubscriberImpl.this.stopAllStreamingConnections();
                        if (failure instanceof StatusRuntimeException && ((StatusRuntimeException)failure).getStatus().getCode() == Status.Code.UNIMPLEMENTED) {
                            logger.info("Unable to open streaming connections, falling back to polling.");
                            SubscriberImpl.this.startPollingConnections();
                            return;
                        }
                        SubscriberImpl.this.notifyFailed(failure);
                    }
                });
            }
            this.ackDeadlineUpdater = this.executor.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    long ackLatency = SubscriberImpl.this.ackLatencyDistribution.getNthPercentile(99.9);
                    if (ackLatency > 0L) {
                        int possibleStreamAckDeadlineSeconds = Math.max(10, Ints.saturatedCast((long)Math.max(ackLatency, SubscriberImpl.this.ackExpirationPadding.getStandardSeconds())));
                        if (SubscriberImpl.this.streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds) {
                            SubscriberImpl.this.streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds;
                            logger.log(Level.FINER, "Updating stream deadline to {0} seconds.", SubscriberImpl.this.streamAckDeadlineSeconds);
                            for (StreamingSubscriberConnection subscriberConnection : SubscriberImpl.this.streamingSubscriberConnections) {
                                subscriberConnection.updateStreamAckDeadline(SubscriberImpl.this.streamAckDeadlineSeconds);
                            }
                        }
                    }
                }
            }, ACK_DEADLINE_UPDATE_PERIOD.getMillis(), ACK_DEADLINE_UPDATE_PERIOD.getMillis(), TimeUnit.MILLISECONDS);
        }

        private void stopAllStreamingConnections() {
            this.stopConnections(this.streamingSubscriberConnections);
            if (this.ackDeadlineUpdater != null) {
                this.ackDeadlineUpdater.cancel(true);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void startPollingConnections() {
            List<PollingSubscriberConnection> list = this.pollingSubscriberConnections;
            synchronized (list) {
                for (int i = 0; i < this.numChannels; ++i) {
                    this.pollingSubscriberConnections.add(new PollingSubscriberConnection(this.cachedSubscriptionNameString, this.credentials, this.receiver, this.ackExpirationPadding, this.ackLatencyDistribution, (Channel)this.channelBuilder.build(), this.flowController, this.executor, this.clock));
                }
                this.startConnections(this.pollingSubscriberConnections, new Service.Listener(){

                    public void failed(Service.State from, Throwable failure) {
                        block2: {
                            SubscriberImpl.this.stopAllPollingConnections();
                            try {
                                SubscriberImpl.this.notifyFailed(failure);
                            }
                            catch (IllegalStateException e) {
                                if (!SubscriberImpl.this.isRunning()) break block2;
                                throw e;
                            }
                        }
                    }
                });
            }
        }

        private void stopAllPollingConnections() {
            this.stopConnections(this.pollingSubscriberConnections);
        }

        private void startConnections(List<? extends Service> connections, final Service.Listener connectionsListener) {
            final CountDownLatch subscribersStarting = new CountDownLatch(this.numChannels);
            for (final Service service : connections) {
                this.executor.submit(new Runnable(){

                    @Override
                    public void run() {
                        service.addListener(connectionsListener, (Executor)SubscriberImpl.this.executor);
                        try {
                            service.startAsync().awaitRunning();
                        }
                        finally {
                            subscribersStarting.countDown();
                        }
                    }
                });
            }
            try {
                subscribersStarting.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void stopConnections(List<? extends Service> connections) {
            ArrayList<? extends Service> liveConnections;
            List<? extends Service> list = connections;
            synchronized (list) {
                liveConnections = new ArrayList<Service>(connections);
                connections.clear();
            }
            final CountDownLatch connectionsStopping = new CountDownLatch(liveConnections.size());
            for (final Service service : liveConnections) {
                this.executor.submit(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            service.stopAsync().awaitTerminated();
                        }
                        catch (IllegalStateException illegalStateException) {
                            // empty catch block
                        }
                        connectionsStopping.countDown();
                    }
                });
            }
            try {
                connectionsStopping.await();
            }
            catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }
    }

    public static abstract class SubscriberListener {
        public void failed(State from, Throwable failure) {
        }

        public void running() {
        }

        public void starting() {
        }

        public void stopping(State from) {
        }

        public void terminated(State from) {
        }
    }

    public static enum State {
        FAILED,
        NEW,
        RUNNING,
        STARTING,
        STOPPING,
        TERMINATED;

    }
}

