/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.api.client.util.Clock;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessagePayloadOnlyCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithMessageIdCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.BucketingFunction;
import org.apache.beam.sdk.util.MovingFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubUnboundedSource
extends PTransform<PBegin, PCollection<PubsubMessage>> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class);
    private static final @UnknownKeyFor @NonNull @Initialized int DEAULT_ACK_TIMEOUT_SEC = 60;
    private static final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized PubsubCheckpointCoder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> CHECKPOINT_CODER = PubsubCheckpointCoder.of();
    private static final @UnknownKeyFor @NonNull @Initialized int PULL_BATCH_SIZE = 1000;
    private static final @UnknownKeyFor @NonNull @Initialized int ACK_BATCH_SIZE = 2000;
    private static final @UnknownKeyFor @NonNull @Initialized int MAX_IN_FLIGHT = 20000;
    private static final @UnknownKeyFor @NonNull @Initialized Duration PROCESSING_TIMEOUT = Duration.standardMinutes((long)2L);
    private static final @UnknownKeyFor @NonNull @Initialized int ACK_EXTENSION_PCT = 50;
    private static final @UnknownKeyFor @NonNull @Initialized int ACK_SAFETY_PCT = 20;
    private static final @UnknownKeyFor @NonNull @Initialized Duration ACK_TOO_LATE = Duration.standardSeconds((long)2L);
    private static final @UnknownKeyFor @NonNull @Initialized Duration SAMPLE_PERIOD = Duration.standardMinutes((long)1L);
    private static final @UnknownKeyFor @NonNull @Initialized Duration SAMPLE_UPDATE = Duration.standardSeconds((long)5L);
    private static final @UnknownKeyFor @NonNull @Initialized Duration LOG_PERIOD = Duration.standardSeconds((long)30L);
    private static final @UnknownKeyFor @NonNull @Initialized int MIN_WATERMARK_MESSAGES = 10;
    private static final @UnknownKeyFor @NonNull @Initialized int MIN_WATERMARK_SPREAD = 2;
    private static final @UnknownKeyFor @NonNull @Initialized int SCALE_OUT = 4;
    private static final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Combine.BinaryCombineLongFn MIN = new Combine.BinaryCombineLongFn(){

        public @UnknownKeyFor @NonNull @Initialized long apply(@UnknownKeyFor @NonNull @Initialized long left, @UnknownKeyFor @NonNull @Initialized long right) {
            return Math.min(left, right);
        }

        public @UnknownKeyFor @NonNull @Initialized long identity() {
            return Long.MAX_VALUE;
        }
    };
    private static final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Combine.BinaryCombineLongFn MAX = new Combine.BinaryCombineLongFn(){

        public @UnknownKeyFor @NonNull @Initialized long apply(@UnknownKeyFor @NonNull @Initialized long left, @UnknownKeyFor @NonNull @Initialized long right) {
            return Math.max(left, right);
        }

        public @UnknownKeyFor @NonNull @Initialized long identity() {
            return Long.MIN_VALUE;
        }
    };
    private static final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Combine.BinaryCombineLongFn SUM = Sum.ofLongs();
    private @Nullable @UnknownKeyFor @Initialized Clock clock;
    private final @UnknownKeyFor @NonNull @Initialized PubsubClient.PubsubClientFactory pubsubFactory;
    private final @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.ProjectPath> project;
    private final @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> topic;
    private @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.SubscriptionPath> subscription;
    private final @Nullable @UnknownKeyFor @Initialized String timestampAttribute;
    private final @Nullable @UnknownKeyFor @Initialized String idAttribute;
    private final @UnknownKeyFor @NonNull @Initialized boolean needsAttributes;
    private final @UnknownKeyFor @NonNull @Initialized boolean needsMessageId;
    private final @UnknownKeyFor @NonNull @Initialized boolean needsOrderingKey;

    @VisibleForTesting
    PubsubUnboundedSource(@UnknownKeyFor @NonNull @Initialized Clock clock, @UnknownKeyFor @NonNull @Initialized PubsubClient.PubsubClientFactory pubsubFactory, @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.ProjectPath> project, @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> topic, @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.SubscriptionPath> subscription, @Nullable @UnknownKeyFor @Initialized String timestampAttribute, @Nullable @UnknownKeyFor @Initialized String idAttribute, @UnknownKeyFor @NonNull @Initialized boolean needsAttributes, @UnknownKeyFor @NonNull @Initialized boolean needsMessageId, @UnknownKeyFor @NonNull @Initialized boolean needsOrderingKey) {
        Preconditions.checkArgument((topic == null != (subscription == null) ? 1 : 0) != 0, (Object)"Exactly one of topic and subscription must be given");
        this.clock = clock;
        this.pubsubFactory = (PubsubClient.PubsubClientFactory)Preconditions.checkNotNull((Object)pubsubFactory);
        this.project = project;
        this.topic = topic;
        this.subscription = subscription;
        this.timestampAttribute = timestampAttribute;
        this.idAttribute = idAttribute;
        this.needsAttributes = needsAttributes;
        this.needsMessageId = needsMessageId;
        this.needsOrderingKey = needsOrderingKey;
    }

    public PubsubUnboundedSource(@UnknownKeyFor @NonNull @Initialized PubsubClient.PubsubClientFactory pubsubFactory, @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.ProjectPath> project, @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> topic, @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.SubscriptionPath> subscription, @Nullable @UnknownKeyFor @Initialized String timestampAttribute, @Nullable @UnknownKeyFor @Initialized String idAttribute, @UnknownKeyFor @NonNull @Initialized boolean needsAttributes) {
        this(null, pubsubFactory, project, topic, subscription, timestampAttribute, idAttribute, needsAttributes, false, false);
    }

    public PubsubUnboundedSource(@UnknownKeyFor @NonNull @Initialized Clock clock, @UnknownKeyFor @NonNull @Initialized PubsubClient.PubsubClientFactory pubsubFactory, @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.ProjectPath> project, @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> topic, @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.SubscriptionPath> subscription, @Nullable @UnknownKeyFor @Initialized String timestampAttribute, @Nullable @UnknownKeyFor @Initialized String idAttribute, @UnknownKeyFor @NonNull @Initialized boolean needsAttributes) {
        this(clock, pubsubFactory, project, topic, subscription, timestampAttribute, idAttribute, needsAttributes, false, false);
    }

    public PubsubUnboundedSource(@UnknownKeyFor @NonNull @Initialized PubsubClient.PubsubClientFactory pubsubFactory, @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.ProjectPath> project, @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> topic, @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.SubscriptionPath> subscription, @Nullable @UnknownKeyFor @Initialized String timestampAttribute, @Nullable @UnknownKeyFor @Initialized String idAttribute, @UnknownKeyFor @NonNull @Initialized boolean needsAttributes, @UnknownKeyFor @NonNull @Initialized boolean needsMessageId) {
        this(null, pubsubFactory, project, topic, subscription, timestampAttribute, idAttribute, needsAttributes, needsMessageId, false);
    }

    public @Nullable @UnknownKeyFor @Initialized PubsubClient.ProjectPath getProject() {
        return this.project == null ? null : (PubsubClient.ProjectPath)this.project.get();
    }

    public @Nullable @UnknownKeyFor @Initialized PubsubClient.TopicPath getTopic() {
        return this.topic == null ? null : (PubsubClient.TopicPath)this.topic.get();
    }

    public @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> getTopicProvider() {
        return this.topic;
    }

    public @Nullable @UnknownKeyFor @Initialized PubsubClient.SubscriptionPath getSubscription() {
        return this.subscription == null ? null : (PubsubClient.SubscriptionPath)this.subscription.get();
    }

    public @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.SubscriptionPath> getSubscriptionProvider() {
        return this.subscription;
    }

    public @Nullable @UnknownKeyFor @Initialized String getTimestampAttribute() {
        return this.timestampAttribute;
    }

    public @Nullable @UnknownKeyFor @Initialized String getIdAttribute() {
        return this.idAttribute;
    }

    public @UnknownKeyFor @NonNull @Initialized boolean getNeedsAttributes() {
        return this.needsAttributes;
    }

    public @UnknownKeyFor @NonNull @Initialized boolean getNeedsMessageId() {
        return this.needsMessageId;
    }

    public @UnknownKeyFor @NonNull @Initialized boolean getNeedsOrderingKey() {
        return this.needsOrderingKey;
    }

    public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized PubsubMessage> expand(@UnknownKeyFor @NonNull @Initialized PBegin input) {
        Object function = this.getNeedsAttributes() || this.getNeedsMessageId() ? new PubsubMessages.ParsePubsubMessageProtoAsPayload() : new PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly();
        PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder messageCoder = this.getNeedsOrderingKey() ? PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.of() : (this.getNeedsMessageId() ? (this.getNeedsAttributes() ? PubsubMessageWithAttributesAndMessageIdCoder.of() : PubsubMessageWithMessageIdCoder.of()) : (this.getNeedsAttributes() ? PubsubMessageWithAttributesCoder.of() : PubsubMessagePayloadOnlyCoder.of()));
        PCollection messages = ((PCollection)((PCollection)input.getPipeline().begin().apply((PTransform)Read.from((UnboundedSource)new PubsubSource(this)))).apply("MapBytesToPubsubMessages", (PTransform)MapElements.into((TypeDescriptor)TypeDescriptor.of(PubsubMessage.class)).via((SerializableFunction)function))).setCoder((Coder)messageCoder);
        if (this.usesStatsFn(input.getPipeline().getOptions())) {
            messages = (PCollection)messages.apply("PubsubUnboundedSource.Stats", (PTransform)ParDo.of((DoFn)new StatsFn(this.pubsubFactory, this.subscription, this.topic, this.timestampAttribute, this.idAttribute)));
        }
        return messages;
    }

    private @UnknownKeyFor @NonNull @Initialized boolean usesStatsFn(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        if (ExperimentalOptions.hasExperiment((PipelineOptions)options, (String)"enable_custom_pubsub_source")) {
            return true;
        }
        return !options.getRunner().getName().startsWith("org.apache.beam.runners.dataflow.");
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private @UnknownKeyFor @NonNull @Initialized PubsubClient.SubscriptionPath createRandomSubscription(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        PubsubClient.ProjectPath projectPath;
        PubsubClient.TopicPath topicPath = (PubsubClient.TopicPath)this.topic.get();
        if (this.project != null) {
            projectPath = (PubsubClient.ProjectPath)this.project.get();
        } else {
            String projectId = ((GcpOptions)options.as(GcpOptions.class)).getProject();
            Preconditions.checkState((projectId != null ? 1 : 0) != 0, (String)"Cannot create subscription to topic %s because pipeline option 'project' not specified", (Object)topicPath);
            projectPath = PubsubClient.projectPathFromId(((GcpOptions)options.as(GcpOptions.class)).getProject());
        }
        try (PubsubClient pubsubClient = this.pubsubFactory.newClient(this.timestampAttribute, this.idAttribute, (PubsubOptions)options.as(PubsubOptions.class));){
            PubsubClient.SubscriptionPath subscriptionPath = pubsubClient.createRandomSubscription(projectPath, topicPath, 60);
            LOG.warn("Created subscription {} to topic {}. Note this subscription WILL NOT be deleted when the pipeline terminates", (Object)subscriptionPath, this.topic);
            PubsubClient.SubscriptionPath subscriptionPath2 = subscriptionPath;
            return subscriptionPath2;
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Failed to create subscription to topic %s on project %s: %s", topicPath, projectPath, e.getMessage()), e);
        }
    }

    private static class StatsFn
    extends DoFn<PubsubMessage, PubsubMessage> {
        private final @UnknownKeyFor @NonNull @Initialized Counter elementCounter = SourceMetrics.elementsRead();
        private final @UnknownKeyFor @NonNull @Initialized PubsubClient.PubsubClientFactory pubsubFactory;
        private final @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.SubscriptionPath> subscription;
        private final @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> topic;
        private final @Nullable @UnknownKeyFor @Initialized String timestampAttribute;
        private final @Nullable @UnknownKeyFor @Initialized String idAttribute;

        public StatsFn(@UnknownKeyFor @NonNull @Initialized PubsubClient.PubsubClientFactory pubsubFactory, @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.SubscriptionPath> subscription, @Nullable @UnknownKeyFor @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath> topic, @Nullable @UnknownKeyFor @Initialized String timestampAttribute, @Nullable @UnknownKeyFor @Initialized String idAttribute) {
            Preconditions.checkArgument((pubsubFactory != null ? 1 : 0) != 0, (Object)"pubsubFactory should not be null");
            this.pubsubFactory = pubsubFactory;
            this.subscription = subscription;
            this.topic = topic;
            this.timestampAttribute = timestampAttribute;
            this.idAttribute = idAttribute;
        }

        @DoFn.ProcessElement
        public void processElement(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) throws @UnknownKeyFor @NonNull @Initialized Exception {
            this.elementCounter.inc();
            c.output((Object)((PubsubMessage)c.element()));
        }

        public void populateDisplayData(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item((String)"subscription", this.subscription)).addIfNotNull(DisplayData.item((String)"topic", this.topic)).add(DisplayData.item((String)"transport", (String)this.pubsubFactory.getKind())).addIfNotNull(DisplayData.item((String)"timestampAttribute", (String)this.timestampAttribute)).addIfNotNull(DisplayData.item((String)"idAttribute", (String)this.idAttribute));
        }
    }

    @VisibleForTesting
    static class PubsubSource
    extends UnboundedSource<byte[], PubsubCheckpoint> {
        public final @UnknownKeyFor @NonNull @Initialized PubsubUnboundedSource outer;
        @VisibleForTesting
        final @UnknownKeyFor @NonNull @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.SubscriptionPath> subscriptionPath;

        public PubsubSource(@UnknownKeyFor @NonNull @Initialized PubsubUnboundedSource outer) {
            this(outer, outer.getSubscriptionProvider());
        }

        private PubsubSource(@UnknownKeyFor @NonNull @Initialized PubsubUnboundedSource outer, @UnknownKeyFor @NonNull @Initialized ValueProvider<@UnknownKeyFor @NonNull @Initialized PubsubClient.SubscriptionPath> subscriptionPath) {
            this.outer = outer;
            this.subscriptionPath = subscriptionPath;
        }

        public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized PubsubSource> split(@UnknownKeyFor @NonNull @Initialized int desiredNumSplits, @UnknownKeyFor @NonNull @Initialized PipelineOptions options) throws @UnknownKeyFor @NonNull @Initialized Exception {
            ArrayList<PubsubSource> result = new ArrayList<PubsubSource>(desiredNumSplits);
            PubsubSource splitSource = this;
            if (this.subscriptionPath == null) {
                splitSource = new PubsubSource(this.outer, (ValueProvider<PubsubClient.SubscriptionPath>)ValueProvider.StaticValueProvider.of((Object)this.outer.createRandomSubscription(options)));
            }
            for (int i = 0; i < desiredNumSplits * 4; ++i) {
                result.add(splitSource);
            }
            return result;
        }

        public @UnknownKeyFor @NonNull @Initialized PubsubReader createReader(@UnknownKeyFor @NonNull @Initialized PipelineOptions options, @Nullable @UnknownKeyFor @Initialized PubsubCheckpoint checkpoint) {
            PubsubReader reader;
            PubsubClient.SubscriptionPath subscription = this.subscriptionPath == null || this.subscriptionPath.get() == null ? (checkpoint == null ? this.outer.createRandomSubscription(options) : checkpoint.getSubscription()) : (PubsubClient.SubscriptionPath)this.subscriptionPath.get();
            try {
                reader = new PubsubReader((PubsubOptions)options.as(PubsubOptions.class), this, subscription);
            }
            catch (IOException | GeneralSecurityException e) {
                throw new RuntimeException("Unable to subscribe to " + this.subscriptionPath + ": ", e);
            }
            if (checkpoint != null) {
                try {
                    checkpoint.nackAll(reader);
                }
                catch (IOException e) {
                    LOG.error("Pubsub {} cannot have {} lost messages NACKed, ignoring exception.", new Object[]{this.subscriptionPath, checkpoint.notYetReadIds.size(), e});
                }
            }
            return reader;
        }

        public @Nullable @UnknownKeyFor @Initialized Coder<@UnknownKeyFor @NonNull @Initialized PubsubCheckpoint> getCheckpointMarkCoder() {
            return CHECKPOINT_CODER;
        }

        public @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> getOutputCoder() {
            return ByteArrayCoder.of();
        }

        public void validate() {
        }

        public @UnknownKeyFor @NonNull @Initialized boolean requiresDeduping() {
            return true;
        }
    }

    @VisibleForTesting
    static class PubsubReader
    extends UnboundedSource.UnboundedReader<byte[]> {
        private final @UnknownKeyFor @NonNull @Initialized PubsubSource outer;
        @VisibleForTesting
        final @UnknownKeyFor @NonNull @Initialized PubsubClient.SubscriptionPath subscription;
        private @UnknownKeyFor @NonNull @Initialized AtomicReference<@UnknownKeyFor @NonNull @Initialized PubsubClient> pubsubClient;
        private @UnknownKeyFor @NonNull @Initialized AtomicBoolean active = new AtomicBoolean(true);
        private @UnknownKeyFor @NonNull @Initialized int ackTimeoutMs;
        private @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> safeToAckIds;
        private final @UnknownKeyFor @NonNull @Initialized Queue<@UnknownKeyFor @NonNull @Initialized PubsubClient.IncomingMessage> notYetRead;
        private final @UnknownKeyFor @NonNull @Initialized LinkedHashMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized InFlightState> inFlight;
        private final @UnknownKeyFor @NonNull @Initialized Queue<@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>> ackedIds;
        private @UnknownKeyFor @NonNull @Initialized long notYetReadBytes;
        private @UnknownKeyFor @NonNull @Initialized BucketingFunction minUnreadTimestampMsSinceEpoch;
        private @UnknownKeyFor @NonNull @Initialized MovingFunction minReadTimestampMsSinceEpoch;
        private @UnknownKeyFor @NonNull @Initialized long lastReceivedMsSinceEpoch;
        private @UnknownKeyFor @NonNull @Initialized long lastWatermarkMsSinceEpoch;
        private @Nullable @UnknownKeyFor @Initialized PubsubClient.IncomingMessage current;
        private @UnknownKeyFor @NonNull @Initialized long lastLogTimestampMsSinceEpoch;
        private @UnknownKeyFor @NonNull @Initialized long numReceived;
        private @UnknownKeyFor @NonNull @Initialized MovingFunction numReceivedRecently;
        private @UnknownKeyFor @NonNull @Initialized MovingFunction numExtendedDeadlines;
        private @UnknownKeyFor @NonNull @Initialized MovingFunction numLateDeadlines;
        private @UnknownKeyFor @NonNull @Initialized MovingFunction numAcked;
        private @UnknownKeyFor @NonNull @Initialized MovingFunction numExpired;
        private @UnknownKeyFor @NonNull @Initialized MovingFunction numNacked;
        private @UnknownKeyFor @NonNull @Initialized MovingFunction numReadBytes;
        private @UnknownKeyFor @NonNull @Initialized MovingFunction minReceivedTimestampMsSinceEpoch;
        private @UnknownKeyFor @NonNull @Initialized MovingFunction maxReceivedTimestampMsSinceEpoch;
        private @UnknownKeyFor @NonNull @Initialized MovingFunction minWatermarkMsSinceEpoch;
        private @UnknownKeyFor @NonNull @Initialized MovingFunction maxWatermarkMsSinceEpoch;
        private @UnknownKeyFor @NonNull @Initialized MovingFunction numLateMessages;
        private @UnknownKeyFor @NonNull @Initialized AtomicInteger numInFlightCheckpoints;
        private @UnknownKeyFor @NonNull @Initialized int maxInFlightCheckpoints;

        private static @UnknownKeyFor @NonNull @Initialized MovingFunction newFun(// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized Combine.BinaryCombineLongFn function) {
            return new MovingFunction(SAMPLE_PERIOD.getMillis(), SAMPLE_UPDATE.getMillis(), 2, 10, function);
        }

        public PubsubReader(@UnknownKeyFor @NonNull @Initialized PubsubOptions options, @UnknownKeyFor @NonNull @Initialized PubsubSource outer, @UnknownKeyFor @NonNull @Initialized PubsubClient.SubscriptionPath subscription) throws @UnknownKeyFor @NonNull @Initialized IOException, @UnknownKeyFor @NonNull @Initialized GeneralSecurityException {
            this.outer = outer;
            this.subscription = subscription;
            this.pubsubClient = new AtomicReference<PubsubClient>(outer.outer.pubsubFactory.newClient(outer.outer.timestampAttribute, outer.outer.idAttribute, options));
            this.ackTimeoutMs = -1;
            this.safeToAckIds = new HashSet<String>();
            this.notYetRead = new ArrayDeque<PubsubClient.IncomingMessage>();
            this.inFlight = new LinkedHashMap();
            this.ackedIds = new ConcurrentLinkedQueue<List<String>>();
            this.notYetReadBytes = 0L;
            this.minUnreadTimestampMsSinceEpoch = new BucketingFunction(SAMPLE_UPDATE.getMillis(), 2, 10, MIN);
            this.minReadTimestampMsSinceEpoch = PubsubReader.newFun(MIN);
            this.lastReceivedMsSinceEpoch = -1L;
            this.lastWatermarkMsSinceEpoch = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
            this.current = null;
            this.lastLogTimestampMsSinceEpoch = -1L;
            this.numReceived = 0L;
            this.numReceivedRecently = PubsubReader.newFun(SUM);
            this.numExtendedDeadlines = PubsubReader.newFun(SUM);
            this.numLateDeadlines = PubsubReader.newFun(SUM);
            this.numAcked = PubsubReader.newFun(SUM);
            this.numExpired = PubsubReader.newFun(SUM);
            this.numNacked = PubsubReader.newFun(SUM);
            this.numReadBytes = PubsubReader.newFun(SUM);
            this.minReceivedTimestampMsSinceEpoch = PubsubReader.newFun(MIN);
            this.maxReceivedTimestampMsSinceEpoch = PubsubReader.newFun(MAX);
            this.minWatermarkMsSinceEpoch = PubsubReader.newFun(MIN);
            this.maxWatermarkMsSinceEpoch = PubsubReader.newFun(MAX);
            this.numLateMessages = PubsubReader.newFun(SUM);
            this.numInFlightCheckpoints = new AtomicInteger();
            this.maxInFlightCheckpoints = 0;
        }

        @VisibleForTesting
        @UnknownKeyFor @NonNull @Initialized PubsubClient getPubsubClient() {
            return this.pubsubClient.get();
        }

        void ackBatch(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> ackIds) throws @UnknownKeyFor @NonNull @Initialized IOException {
            this.pubsubClient.get().acknowledge(this.subscription, ackIds);
            this.ackedIds.add(ackIds);
        }

        public void nackBatch(@UnknownKeyFor @NonNull @Initialized long nowMsSinceEpoch, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> ackIds) throws @UnknownKeyFor @NonNull @Initialized IOException {
            this.pubsubClient.get().modifyAckDeadline(this.subscription, ackIds, 0);
            this.numNacked.add(nowMsSinceEpoch, (long)ackIds.size());
        }

        private void extendBatch(@UnknownKeyFor @NonNull @Initialized long nowMsSinceEpoch, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> ackIds) throws @UnknownKeyFor @NonNull @Initialized IOException {
            int extensionSec = this.ackTimeoutMs * 50 / 100000;
            this.pubsubClient.get().modifyAckDeadline(this.subscription, ackIds, extensionSec);
            this.numExtendedDeadlines.add(nowMsSinceEpoch, (long)ackIds.size());
        }

        private @UnknownKeyFor @NonNull @Initialized long now() {
            if (this.outer.outer.clock == null) {
                return System.currentTimeMillis();
            }
            return this.outer.outer.clock.currentTimeMillis();
        }

        private void retire() throws @UnknownKeyFor @NonNull @Initialized IOException {
            long nowMsSinceEpoch = this.now();
            List<String> ackIds;
            block0: while ((ackIds = this.ackedIds.poll()) != null) {
                this.numAcked.add(nowMsSinceEpoch, (long)ackIds.size());
                Iterator<String> iterator = ackIds.iterator();
                while (true) {
                    if (!iterator.hasNext()) continue block0;
                    String ackId = iterator.next();
                    this.inFlight.remove(ackId);
                    this.safeToAckIds.remove(ackId);
                }
                break;
            }
            return;
        }

        private void extend() throws @UnknownKeyFor @NonNull @Initialized IOException {
            while (true) {
                long nowMsSinceEpoch = this.now();
                ArrayList<String> assumeExpired = new ArrayList<String>();
                ArrayList<String> toBeExtended = new ArrayList<String>();
                ArrayList<String> toBeExpired = new ArrayList<String>();
                for (Map.Entry<String, InFlightState> entry : this.inFlight.entrySet()) {
                    if (entry.getValue().ackDeadlineMsSinceEpoch - (long)(this.ackTimeoutMs * 20 / 100) > nowMsSinceEpoch) break;
                    if (entry.getValue().ackDeadlineMsSinceEpoch - ACK_TOO_LATE.getMillis() < nowMsSinceEpoch) {
                        assumeExpired.add(entry.getKey());
                        continue;
                    }
                    if (entry.getValue().requestTimeMsSinceEpoch + PROCESSING_TIMEOUT.getMillis() < nowMsSinceEpoch) {
                        toBeExpired.add(entry.getKey());
                        continue;
                    }
                    toBeExtended.add(entry.getKey());
                    if (toBeExtended.size() < 2000) continue;
                    break;
                }
                if (assumeExpired.isEmpty() && toBeExtended.isEmpty() && toBeExpired.isEmpty()) {
                    return;
                }
                if (!assumeExpired.isEmpty()) {
                    this.numLateDeadlines.add(nowMsSinceEpoch, (long)assumeExpired.size());
                    for (String ackId : assumeExpired) {
                        this.inFlight.remove(ackId);
                    }
                }
                if (!toBeExpired.isEmpty()) {
                    this.numExpired.add(nowMsSinceEpoch, (long)toBeExpired.size());
                    for (String ackId : toBeExpired) {
                        this.inFlight.remove(ackId);
                    }
                }
                if (toBeExtended.isEmpty()) continue;
                long newDeadlineMsSinceEpoch = nowMsSinceEpoch + (long)(this.ackTimeoutMs * 50 / 100);
                for (String ackId : toBeExtended) {
                    InFlightState state = (InFlightState)this.inFlight.remove(ackId);
                    this.inFlight.put(ackId, new InFlightState(state.requestTimeMsSinceEpoch, newDeadlineMsSinceEpoch));
                }
                this.extendBatch(nowMsSinceEpoch, toBeExtended);
            }
        }

        private void pull() throws @UnknownKeyFor @NonNull @Initialized IOException {
            if (this.inFlight.size() >= 20000) {
                return;
            }
            long requestTimeMsSinceEpoch = this.now();
            long deadlineMsSinceEpoch = requestTimeMsSinceEpoch + (long)this.ackTimeoutMs;
            List<PubsubClient.IncomingMessage> receivedMessages = this.pubsubClient.get().pull(requestTimeMsSinceEpoch, this.subscription, 1000, true);
            if (receivedMessages.isEmpty()) {
                return;
            }
            this.lastReceivedMsSinceEpoch = requestTimeMsSinceEpoch;
            for (PubsubClient.IncomingMessage incomingMessage : receivedMessages) {
                this.notYetRead.add(incomingMessage);
                this.notYetReadBytes += (long)incomingMessage.message().getData().size();
                this.inFlight.put(incomingMessage.ackId(), new InFlightState(requestTimeMsSinceEpoch, deadlineMsSinceEpoch));
                ++this.numReceived;
                this.numReceivedRecently.add(requestTimeMsSinceEpoch, 1L);
                this.minReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch());
                this.maxReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch());
                this.minUnreadTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch());
            }
        }

        private void stats() {
            long nowMsSinceEpoch = this.now();
            if (this.lastLogTimestampMsSinceEpoch < 0L) {
                this.lastLogTimestampMsSinceEpoch = nowMsSinceEpoch;
                return;
            }
            long deltaMs = nowMsSinceEpoch - this.lastLogTimestampMsSinceEpoch;
            if (deltaMs < LOG_PERIOD.getMillis()) {
                return;
            }
            String messageSkew = "unknown";
            long minTimestamp = this.minReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch);
            long maxTimestamp = this.maxReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch);
            if (minTimestamp < Long.MAX_VALUE && maxTimestamp > Long.MIN_VALUE) {
                messageSkew = maxTimestamp - minTimestamp + "ms";
            }
            String watermarkSkew = "unknown";
            long minWatermark = this.minWatermarkMsSinceEpoch.get(nowMsSinceEpoch);
            long maxWatermark = this.maxWatermarkMsSinceEpoch.get(nowMsSinceEpoch);
            if (minWatermark < Long.MAX_VALUE && maxWatermark > Long.MIN_VALUE) {
                watermarkSkew = maxWatermark - minWatermark + "ms";
            }
            String oldestInFlight = "no";
            String oldestAckId = (String)Iterables.getFirst(this.inFlight.keySet(), null);
            if (oldestAckId != null) {
                oldestInFlight = nowMsSinceEpoch - this.inFlight.get((Object)oldestAckId).requestTimeMsSinceEpoch + "ms";
            }
            LOG.debug("Pubsub {} has {} received messages, {} current unread messages, {} current unread bytes, {} current in-flight msgs, {} oldest in-flight, {} current in-flight checkpoints, {} max in-flight checkpoints, {}B/s recent read, {} recent received, {} recent extended, {} recent late extended, {} recent ACKed, {} recent NACKed, {} recent expired, {} recent message timestamp skew, {} recent watermark skew, {} recent late messages, {} last reported watermark", new Object[]{this.subscription, this.numReceived, this.notYetRead.size(), this.notYetReadBytes, this.inFlight.size(), oldestInFlight, this.numInFlightCheckpoints.get(), this.maxInFlightCheckpoints, this.numReadBytes.get(nowMsSinceEpoch) / (SAMPLE_PERIOD.getMillis() / 1000L), this.numReceivedRecently.get(nowMsSinceEpoch), this.numExtendedDeadlines.get(nowMsSinceEpoch), this.numLateDeadlines.get(nowMsSinceEpoch), this.numAcked.get(nowMsSinceEpoch), this.numNacked.get(nowMsSinceEpoch), this.numExpired.get(nowMsSinceEpoch), messageSkew, watermarkSkew, this.numLateMessages.get(nowMsSinceEpoch), new Instant(this.lastWatermarkMsSinceEpoch)});
            this.lastLogTimestampMsSinceEpoch = nowMsSinceEpoch;
        }

        public @UnknownKeyFor @NonNull @Initialized boolean start() throws @UnknownKeyFor @NonNull @Initialized IOException {
            this.ackTimeoutMs = this.pubsubClient.get().ackDeadlineSeconds(this.subscription) * 1000;
            return this.advance();
        }

        public @UnknownKeyFor @NonNull @Initialized boolean advance() throws @UnknownKeyFor @NonNull @Initialized IOException {
            this.stats();
            if (this.current != null) {
                this.minUnreadTimestampMsSinceEpoch.remove(this.current.requestTimeMsSinceEpoch());
                this.current = null;
            }
            this.retire();
            this.extend();
            if (this.notYetRead.isEmpty()) {
                this.pull();
            }
            this.current = this.notYetRead.poll();
            if (this.current == null) {
                return false;
            }
            this.notYetReadBytes -= (long)this.current.message().getData().size();
            Preconditions.checkState((this.notYetReadBytes >= 0L ? 1 : 0) != 0);
            long nowMsSinceEpoch = this.now();
            this.numReadBytes.add(nowMsSinceEpoch, (long)this.current.message().getData().size());
            this.minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, this.current.timestampMsSinceEpoch());
            if (this.current.timestampMsSinceEpoch() < this.lastWatermarkMsSinceEpoch) {
                this.numLateMessages.add(nowMsSinceEpoch, 1L);
            }
            this.safeToAckIds.add(this.current.ackId());
            return true;
        }

        public @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] getCurrent() throws @UnknownKeyFor @NonNull @Initialized NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            if (this.outer.outer.getNeedsMessageId() || this.outer.outer.getNeedsAttributes()) {
                com.google.pubsub.v1.PubsubMessage output = this.current.message().toBuilder().setMessageId(this.current.recordId()).build();
                return output.toByteArray();
            }
            return this.current.message().getData().toByteArray();
        }

        public @UnknownKeyFor @NonNull @Initialized Instant getCurrentTimestamp() throws @UnknownKeyFor @NonNull @Initialized NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return new Instant(this.current.timestampMsSinceEpoch());
        }

        public @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] getCurrentRecordId() throws @UnknownKeyFor @NonNull @Initialized NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current.recordId().getBytes(StandardCharsets.UTF_8);
        }

        public void close() throws @UnknownKeyFor @NonNull @Initialized IOException {
            this.active.set(false);
            this.maybeCloseClient();
        }

        private void maybeCloseClient() throws @UnknownKeyFor @NonNull @Initialized IOException {
            PubsubClient client;
            if (!this.active.get() && this.numInFlightCheckpoints.get() == 0 && (client = (PubsubClient)this.pubsubClient.getAndSet(null)) != null) {
                client.close();
            }
        }

        public @UnknownKeyFor @NonNull @Initialized PubsubSource getCurrentSource() {
            return this.outer;
        }

        public @UnknownKeyFor @NonNull @Initialized Instant getWatermark() {
            if (this.pubsubClient.get().isEOF() && this.notYetRead.isEmpty()) {
                return BoundedWindow.TIMESTAMP_MAX_VALUE;
            }
            long nowMsSinceEpoch = this.now();
            long readMin = this.minReadTimestampMsSinceEpoch.get(nowMsSinceEpoch);
            long unreadMin = this.minUnreadTimestampMsSinceEpoch.get();
            if (readMin == Long.MAX_VALUE && unreadMin == Long.MAX_VALUE && this.lastReceivedMsSinceEpoch >= 0L && nowMsSinceEpoch > this.lastReceivedMsSinceEpoch + SAMPLE_PERIOD.getMillis()) {
                this.lastWatermarkMsSinceEpoch = nowMsSinceEpoch;
            } else if (this.minReadTimestampMsSinceEpoch.isSignificant() || this.minUnreadTimestampMsSinceEpoch.isSignificant()) {
                this.lastWatermarkMsSinceEpoch = Math.min(readMin, unreadMin);
            }
            this.minWatermarkMsSinceEpoch.add(nowMsSinceEpoch, this.lastWatermarkMsSinceEpoch);
            this.maxWatermarkMsSinceEpoch.add(nowMsSinceEpoch, this.lastWatermarkMsSinceEpoch);
            return new Instant(this.lastWatermarkMsSinceEpoch);
        }

        public @UnknownKeyFor @NonNull @Initialized PubsubCheckpoint getCheckpointMark() {
            int cur = this.numInFlightCheckpoints.incrementAndGet();
            this.maxInFlightCheckpoints = Math.max(this.maxInFlightCheckpoints, cur);
            ArrayList snapshotSafeToAckIds = Lists.newArrayList(this.safeToAckIds);
            ArrayList<String> snapshotNotYetReadIds = new ArrayList<String>(this.notYetRead.size());
            for (PubsubClient.IncomingMessage incomingMessage : this.notYetRead) {
                snapshotNotYetReadIds.add(incomingMessage.ackId());
            }
            if (this.outer.subscriptionPath == null) {
                return new PubsubCheckpoint(this.subscription.getPath(), this, snapshotSafeToAckIds, snapshotNotYetReadIds);
            }
            return new PubsubCheckpoint(null, this, snapshotSafeToAckIds, snapshotNotYetReadIds);
        }

        public @UnknownKeyFor @NonNull @Initialized long getSplitBacklogBytes() {
            return this.notYetReadBytes;
        }

        private static class InFlightState {
            @UnknownKeyFor @NonNull @Initialized long requestTimeMsSinceEpoch;
            @UnknownKeyFor @NonNull @Initialized long ackDeadlineMsSinceEpoch;

            public InFlightState(@UnknownKeyFor @NonNull @Initialized long requestTimeMsSinceEpoch, @UnknownKeyFor @NonNull @Initialized long ackDeadlineMsSinceEpoch) {
                this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
                this.ackDeadlineMsSinceEpoch = ackDeadlineMsSinceEpoch;
            }
        }
    }

    private static class PubsubCheckpointCoder<@UnknownKeyFor T>
    extends AtomicCoder<PubsubCheckpoint> {
        private static final @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized String> SUBSCRIPTION_PATH_CODER = NullableCoder.of((Coder)StringUtf8Coder.of());
        private static final @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>> LIST_CODER = ListCoder.of((Coder)StringUtf8Coder.of());

        public static <T> @UnknownKeyFor @NonNull @Initialized PubsubCheckpointCoder<T> of() {
            return new PubsubCheckpointCoder<T>();
        }

        private PubsubCheckpointCoder() {
        }

        public void encode(@UnknownKeyFor @NonNull @Initialized PubsubCheckpoint value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) throws @UnknownKeyFor @NonNull @Initialized IOException {
            SUBSCRIPTION_PATH_CODER.encode((Object)value.subscriptionPath, outStream);
            LIST_CODER.encode(value.notYetReadIds, outStream);
        }

        public @UnknownKeyFor @NonNull @Initialized PubsubCheckpoint decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) throws @UnknownKeyFor @NonNull @Initialized IOException {
            String path = (String)SUBSCRIPTION_PATH_CODER.decode(inStream);
            List notYetReadIds = (List)LIST_CODER.decode(inStream);
            return new PubsubCheckpoint(path, null, null, notYetReadIds);
        }
    }

    @VisibleForTesting
    static class PubsubCheckpoint
    implements UnboundedSource.CheckpointMark {
        @VisibleForTesting
        @Nullable @UnknownKeyFor @Initialized String subscriptionPath;
        private @Nullable @UnknownKeyFor @Initialized PubsubReader reader;
        private @Nullable @UnknownKeyFor @Initialized List<@UnknownKeyFor @NonNull @Initialized String> safeToAckIds;
        @VisibleForTesting
        final @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> notYetReadIds;

        public PubsubCheckpoint(@Nullable @UnknownKeyFor @Initialized String subscriptionPath, @Nullable @UnknownKeyFor @Initialized PubsubReader reader, @Nullable @UnknownKeyFor @Initialized List<@UnknownKeyFor @NonNull @Initialized String> safeToAckIds, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> notYetReadIds) {
            this.subscriptionPath = subscriptionPath;
            this.reader = reader;
            this.safeToAckIds = safeToAckIds;
            this.notYetReadIds = notYetReadIds;
        }

        private @Nullable @UnknownKeyFor @Initialized PubsubClient.SubscriptionPath getSubscription() {
            return this.subscriptionPath == null ? null : PubsubClient.subscriptionPathFromPath(this.subscriptionPath);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void finalizeCheckpoint() throws @UnknownKeyFor @NonNull @Initialized IOException {
            Preconditions.checkState((this.reader != null && this.safeToAckIds != null ? 1 : 0) != 0, (Object)"Cannot finalize a restored checkpoint");
            try {
                int remainingInFlight;
                int n = this.safeToAckIds.size();
                ArrayList<String> batchSafeToAckIds = new ArrayList<String>(Math.min(n, 2000));
                for (String ackId : this.safeToAckIds) {
                    batchSafeToAckIds.add(ackId);
                    if (batchSafeToAckIds.size() < 2000) continue;
                    this.reader.ackBatch(batchSafeToAckIds);
                    batchSafeToAckIds = new ArrayList(Math.min(n -= batchSafeToAckIds.size(), 2000));
                }
                if (!batchSafeToAckIds.isEmpty()) {
                    this.reader.ackBatch(batchSafeToAckIds);
                }
                Preconditions.checkState(((remainingInFlight = this.reader.numInFlightCheckpoints.decrementAndGet()) >= 0 ? 1 : 0) != 0, (Object)"Miscounted in-flight checkpoints");
            }
            catch (Throwable throwable) {
                int remainingInFlight = this.reader.numInFlightCheckpoints.decrementAndGet();
                Preconditions.checkState((remainingInFlight >= 0 ? 1 : 0) != 0, (Object)"Miscounted in-flight checkpoints");
                this.reader.maybeCloseClient();
                this.reader = null;
                this.safeToAckIds = null;
                throw throwable;
            }
            this.reader.maybeCloseClient();
            this.reader = null;
            this.safeToAckIds = null;
        }

        private static @UnknownKeyFor @NonNull @Initialized long now(@UnknownKeyFor @NonNull @Initialized PubsubReader reader) {
            if (((PubsubReader)reader).outer.outer.clock == null) {
                return System.currentTimeMillis();
            }
            return ((PubsubReader)reader).outer.outer.clock.currentTimeMillis();
        }

        public void nackAll(@UnknownKeyFor @NonNull @Initialized PubsubReader reader) throws @UnknownKeyFor @NonNull @Initialized IOException {
            Preconditions.checkState((this.reader == null ? 1 : 0) != 0, (Object)"Cannot nackAll on persisting checkpoint");
            ArrayList<String> batchYetToAckIds = new ArrayList<String>(Math.min(this.notYetReadIds.size(), 2000));
            for (String ackId : this.notYetReadIds) {
                batchYetToAckIds.add(ackId);
                if (batchYetToAckIds.size() < 2000) continue;
                long nowMsSinceEpoch = PubsubCheckpoint.now(reader);
                reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
                batchYetToAckIds.clear();
            }
            if (!batchYetToAckIds.isEmpty()) {
                long nowMsSinceEpoch = PubsubCheckpoint.now(reader);
                reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
            }
        }
    }
}

