/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.UncheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.cloud.pubsublite.proto.Subscription;
import java.io.Serializable;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.pubsublite.InitialOffsetReader;
import org.apache.beam.sdk.io.gcp.pubsublite.OffsetByteProgress;
import org.apache.beam.sdk.io.gcp.pubsublite.OffsetByteRangeTracker;
import org.apache.beam.sdk.io.gcp.pubsublite.PerSubscriptionPartitionSdf;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriptionPartition;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriptionPartitionLoader;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriptionPartitionProcessor;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriptionPartitionProcessorImpl;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
import org.joda.time.Duration;

class SubscribeTransform
extends PTransform<PBegin, PCollection<SequencedMessage>> {
    private static final Duration MAX_SLEEP_TIME = Duration.standardMinutes((long)1L);
    private final SubscriberOptions options;

    SubscribeTransform(SubscriberOptions options) {
        this.options = options;
    }

    private void checkSubscription(SubscriptionPartition subscriptionPartition) throws ApiException {
        UncheckedApiPreconditions.checkArgument((boolean)subscriptionPartition.subscription().equals(this.options.subscriptionPath()));
    }

    private Subscriber newSubscriber(Partition partition, Consumer<List<SequencedMessage>> consumer) {
        try {
            return this.options.getSubscriberFactory(partition).newSubscriber(messages -> consumer.accept(messages.stream().map(message -> message.toProto()).collect(Collectors.toList())));
        }
        catch (Throwable t) {
            throw ExtractStatus.toCanonical((Throwable)t).underlying;
        }
    }

    private SubscriptionPartitionProcessor newPartitionProcessor(SubscriptionPartition subscriptionPartition, RestrictionTracker<OffsetRange, OffsetByteProgress> tracker, DoFn.OutputReceiver<SequencedMessage> receiver) throws ApiException {
        this.checkSubscription(subscriptionPartition);
        return new SubscriptionPartitionProcessorImpl(tracker, receiver, consumer -> this.newSubscriber(subscriptionPartition.partition(), (Consumer<List<SequencedMessage>>)consumer), this.options.flowControlSettings());
    }

    private RestrictionTracker<OffsetRange, OffsetByteProgress> newRestrictionTracker(SubscriptionPartition subscriptionPartition, OffsetRange initial) {
        this.checkSubscription(subscriptionPartition);
        return new OffsetByteRangeTracker(initial, this.options.getBacklogReader(subscriptionPartition.partition()), Stopwatch.createUnstarted(), MAX_SLEEP_TIME.multipliedBy(3L).dividedBy(4L), LongMath.saturatedMultiply((long)this.options.flowControlSettings().bytesOutstanding(), (long)10L));
    }

    private InitialOffsetReader newInitialOffsetReader(SubscriptionPartition subscriptionPartition) {
        this.checkSubscription(subscriptionPartition);
        return this.options.getInitialOffsetReader(subscriptionPartition.partition());
    }

    private Committer newCommitter(SubscriptionPartition subscriptionPartition) {
        this.checkSubscription(subscriptionPartition);
        return this.options.getCommitter(subscriptionPartition.partition());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private TopicPath getTopicPath() {
        try (AdminClient admin = AdminClient.create((AdminClientSettings)AdminClientSettings.newBuilder().setRegion(this.options.subscriptionPath().location().region()).build());){
            TopicPath topicPath = TopicPath.parse((String)((Subscription)admin.getSubscription(this.options.subscriptionPath()).get()).getTopic());
            return topicPath;
        }
        catch (Throwable t) {
            throw ExtractStatus.toCanonical((Throwable)t).underlying;
        }
    }

    public PCollection<SequencedMessage> expand(PBegin input) {
        PCollection subscriptionPartitions = this.options.partitions().isEmpty() ? (PCollection)input.apply((PTransform)new SubscriptionPartitionLoader(this.getTopicPath(), this.options.subscriptionPath())) : (PCollection)input.apply((PTransform)Create.of((Iterable)this.options.partitions().stream().map(partition -> SubscriptionPartition.of(this.options.subscriptionPath(), partition)).collect(Collectors.toList())));
        return (PCollection)subscriptionPartitions.apply((PTransform)ParDo.of((DoFn)new PerSubscriptionPartitionSdf(MAX_SLEEP_TIME, (SerializableFunction<SubscriptionPartition, InitialOffsetReader>)((SerializableFunction & Serializable)this::newInitialOffsetReader), (SerializableBiFunction<SubscriptionPartition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>>)((SerializableBiFunction & Serializable)this::newRestrictionTracker), this::newPartitionProcessor, (SerializableFunction<SubscriptionPartition, Committer>)((SerializableFunction & Serializable)this::newCommitter))));
    }
}

