/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.api.core.ApiFutures;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.joda.time.Seconds;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

public class TestPubsub
implements TestRule {
    private static final DateTimeFormatter DATETIME_FORMAT = DateTimeFormat.forPattern((String)"YYYY-MM-dd-HH-mm-ss-SSS");
    private static final String EVENTS_TOPIC_NAME = "events";
    private static final String TOPIC_PREFIX = "integ-test-";
    private static final Integer DEFAULT_ACK_DEADLINE_SECONDS = 60;
    private final TestPubsubOptions pipelineOptions;
    private final String pubsubEndpoint;
    private final boolean isLocalhost;
    private @Nullable TopicAdminClient topicAdmin = null;
    private @Nullable SubscriptionAdminClient subscriptionAdmin = null;
    private @Nullable PubsubClient.TopicPath eventsTopicPath = null;
    private @Nullable PubsubClient.SubscriptionPath subscriptionPath = null;
    private @Nullable ManagedChannel channel = null;
    private @Nullable TransportChannelProvider channelProvider = null;

    public static TestPubsub create() {
        return TestPubsub.fromOptions(TestPipeline.testingPipelineOptions());
    }

    public static TestPubsub fromOptions(PipelineOptions options) {
        return new TestPubsub((TestPubsubOptions)options.as(TestPubsubOptions.class));
    }

    private TestPubsub(TestPubsubOptions pipelineOptions) {
        this.pipelineOptions = pipelineOptions;
        this.pubsubEndpoint = PubsubOptions.targetForRootUrl(this.pipelineOptions.getPubsubRootUrl());
        this.isLocalhost = this.pubsubEndpoint.startsWith("localhost");
    }

    public Statement apply(final Statement base, final Description description) {
        return new Statement(){

            public void evaluate() throws Throwable {
                if (TestPubsub.this.topicAdmin != null || TestPubsub.this.subscriptionAdmin != null) {
                    throw new AssertionError((Object)("Pubsub client was not shutdown after previous test. Topic path is'" + TestPubsub.this.eventsTopicPath + "'. Current test: " + description.getDisplayName()));
                }
                try {
                    TestPubsub.this.initializePubsub(description);
                    base.evaluate();
                }
                finally {
                    TestPubsub.this.tearDown();
                }
            }
        };
    }

    private void initializePubsub(Description description) throws IOException {
        this.channel = this.isLocalhost ? ManagedChannelBuilder.forTarget((String)this.pubsubEndpoint).usePlaintext().build() : ManagedChannelBuilder.forTarget((String)this.pubsubEndpoint).useTransportSecurity().build();
        this.channelProvider = FixedTransportChannelProvider.create((TransportChannel)GrpcTransportChannel.create((ManagedChannel)this.channel));
        this.topicAdmin = TopicAdminClient.create((TopicAdminSettings)((TopicAdminSettings.Builder)((TopicAdminSettings.Builder)((TopicAdminSettings.Builder)TopicAdminSettings.newBuilder().setCredentialsProvider(() -> ((TestPubsubOptions)this.pipelineOptions).getGcpCredential())).setTransportChannelProvider(this.channelProvider)).setEndpoint(this.pubsubEndpoint)).build());
        this.subscriptionAdmin = SubscriptionAdminClient.create((SubscriptionAdminSettings)((SubscriptionAdminSettings.Builder)((SubscriptionAdminSettings.Builder)((SubscriptionAdminSettings.Builder)SubscriptionAdminSettings.newBuilder().setCredentialsProvider(() -> ((TestPubsubOptions)this.pipelineOptions).getGcpCredential())).setTransportChannelProvider(this.channelProvider)).setEndpoint(this.pubsubEndpoint)).build());
        PubsubClient.TopicPath eventsTopicPathTmp = PubsubClient.topicPathFromName(this.pipelineOptions.getProject(), TestPubsub.createTopicName(description, EVENTS_TOPIC_NAME));
        this.topicAdmin.createTopic(eventsTopicPathTmp.getPath());
        this.eventsTopicPath = eventsTopicPathTmp;
        String subscriptionName = this.topicPath().getName() + "_beam_" + ThreadLocalRandom.current().nextLong();
        PubsubClient.SubscriptionPath subscriptionPathTmp = new PubsubClient.SubscriptionPath(String.format("projects/%s/subscriptions/%s", this.pipelineOptions.getProject(), subscriptionName));
        this.subscriptionAdmin.createSubscription(subscriptionPathTmp.getPath(), this.topicPath().getPath(), PushConfig.getDefaultInstance(), DEFAULT_ACK_DEADLINE_SECONDS.intValue());
        this.subscriptionPath = subscriptionPathTmp;
    }

    private void tearDown() {
        if (this.subscriptionAdmin == null || this.topicAdmin == null || this.channel == null) {
            return;
        }
        try {
            if (this.subscriptionPath != null) {
                this.subscriptionAdmin.deleteSubscription(this.subscriptionPath.getPath());
            }
            if (this.eventsTopicPath != null) {
                for (String subscriptionPath : this.topicAdmin.listTopicSubscriptions(this.eventsTopicPath.getPath()).iterateAll()) {
                    this.subscriptionAdmin.deleteSubscription(subscriptionPath);
                }
                this.topicAdmin.deleteTopic(this.eventsTopicPath.getPath());
            }
        }
        finally {
            this.subscriptionAdmin.close();
            this.topicAdmin.close();
            this.channel.shutdown();
            this.subscriptionAdmin = null;
            this.topicAdmin = null;
            this.channelProvider = null;
            this.channel = null;
            this.eventsTopicPath = null;
            this.subscriptionPath = null;
        }
    }

    static String createTopicName(Description description, String name) throws IOException {
        StringBuilder topicName = new StringBuilder(TOPIC_PREFIX);
        if (description.getClassName() != null) {
            try {
                topicName.append(Class.forName(description.getClassName()).getSimpleName()).append("-");
            }
            catch (ClassNotFoundException e) {
                throw new RuntimeException(e);
            }
        }
        if (description.getMethodName() != null) {
            topicName.append(description.getMethodName().replaceAll("[\\[\\]]", "")).append("-");
        }
        DATETIME_FORMAT.printTo(topicName, (ReadableInstant)Instant.now());
        return topicName.toString() + "-" + name + "-" + String.valueOf(ThreadLocalRandom.current().nextLong());
    }

    public PubsubClient.TopicPath topicPath() {
        return this.eventsTopicPath;
    }

    public PubsubClient.SubscriptionPath subscriptionPath() {
        return this.subscriptionPath;
    }

    private List<String> listSubscriptions(PubsubClient.TopicPath topicPath) {
        Preconditions.checkNotNull((Object)this.topicAdmin);
        return Streams.stream((Iterable)this.topicAdmin.listTopicSubscriptions(topicPath.getPath()).iterateAll()).filter(path -> !path.equals(this.subscriptionPath.getPath())).collect(Collectors.toList());
    }

    public void publish(List<PubsubMessage> messages) {
        Publisher eventPublisher;
        Preconditions.checkNotNull((Object)this.eventsTopicPath);
        try {
            eventPublisher = Publisher.newBuilder((String)this.eventsTopicPath.getPath()).setCredentialsProvider(() -> ((TestPubsubOptions)this.pipelineOptions).getGcpCredential()).setChannelProvider(this.channelProvider).setEndpoint(this.pubsubEndpoint).build();
        }
        catch (IOException e) {
            throw new RuntimeException("Error creating event publisher", e);
        }
        List futures = messages.stream().map(message -> {
            PubsubMessage.Builder builder = com.google.pubsub.v1.PubsubMessage.newBuilder().setData(ByteString.copyFrom((byte[])message.getPayload())).putAllAttributes(message.getAttributeMap());
            return eventPublisher.publish(builder.build());
        }).collect(Collectors.toList());
        try {
            ApiFutures.allAsList(futures).get();
        }
        catch (ExecutionException e) {
            throw new RuntimeException("Error publishing a test message", e);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted while waiting for messages to publish", e);
        }
        eventPublisher.shutdown();
    }

    public List<PubsubMessage> waitForNMessages(int n, Duration timeoutDuration) throws IOException, InterruptedException {
        Preconditions.checkNotNull((Object)this.subscriptionPath);
        LinkedBlockingDeque receivedMessages = new LinkedBlockingDeque(n);
        MessageReceiver receiver = (message, replyConsumer) -> {
            if (receivedMessages.offer(message)) {
                replyConsumer.ack();
            } else {
                replyConsumer.nack();
            }
        };
        Subscriber subscriber = Subscriber.newBuilder((String)this.subscriptionPath.getPath(), (MessageReceiver)receiver).setCredentialsProvider(() -> ((TestPubsubOptions)this.pipelineOptions).getGcpCredential()).setChannelProvider(this.channelProvider).setEndpoint(this.pubsubEndpoint).build();
        subscriber.startAsync();
        DateTime startTime = new DateTime();
        int timeoutSeconds = timeoutDuration.toStandardSeconds().getSeconds();
        while (receivedMessages.size() < n && Seconds.secondsBetween((ReadableInstant)startTime, (ReadableInstant)new DateTime()).getSeconds() < timeoutSeconds) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {}
        }
        subscriber.stopAsync();
        subscriber.awaitTerminated();
        return receivedMessages.stream().map(message -> new PubsubMessage(message.getData().toByteArray(), message.getAttributesMap(), message.getMessageId())).collect(Collectors.toList());
    }

    public PollingAssertion assertThatTopicEventuallyReceives(Matcher<PubsubMessage> ... matchers) {
        return timeoutDuration -> MatcherAssert.assertThat(this.waitForNMessages(matchers.length, timeoutDuration), (Matcher)Matchers.containsInAnyOrder((Matcher[])matchers));
    }

    @Deprecated
    public void checkIfAnySubscriptionExists(String project, Duration timeoutDuration) throws InterruptedException, IllegalArgumentException, IOException, TimeoutException {
        try {
            this.assertSubscriptionEventuallyCreated(project, timeoutDuration);
        }
        catch (AssertionError e) {
            throw new TimeoutException(((Throwable)((Object)e)).getMessage());
        }
    }

    public void assertSubscriptionEventuallyCreated(String project, Duration timeoutDuration) throws InterruptedException, IllegalArgumentException, IOException {
        if (timeoutDuration.getMillis() <= 0L) {
            throw new IllegalArgumentException(String.format("timeoutDuration should be greater than 0", new Object[0]));
        }
        DateTime startTime = new DateTime();
        int sizeOfSubscriptionList = 0;
        while (sizeOfSubscriptionList == 0 && Seconds.secondsBetween((ReadableInstant)new DateTime(), (ReadableInstant)startTime).getSeconds() < timeoutDuration.toStandardSeconds().getSeconds()) {
            Thread.sleep(1000L);
            sizeOfSubscriptionList = Iterables.size(this.listSubscriptions(this.topicPath()));
        }
        if (sizeOfSubscriptionList > 0) {
            return;
        }
        throw new AssertionError((Object)("Timed out before subscription created for " + this.topicPath()));
    }

    private PubsubClient.OutgoingMessage toOutgoingMessage(PubsubMessage message) {
        return PubsubClient.OutgoingMessage.of(com.google.pubsub.v1.PubsubMessage.newBuilder().setData(ByteString.copyFrom((byte[])message.getPayload())).putAllAttributes(message.getAttributeMap()).build(), DateTime.now().getMillis(), null);
    }

    public static interface PollingAssertion {
        public void waitForUpTo(Duration var1) throws IOException, InterruptedException;
    }
}

