/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubGrpcClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.joda.time.Seconds;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

public class TestPubsub
implements TestRule {
    private static final DateTimeFormatter DATETIME_FORMAT = DateTimeFormat.forPattern((String)"YYYY-MM-dd-HH-mm-ss-SSS");
    private static final String EVENTS_TOPIC_NAME = "events";
    private static final String TOPIC_PREFIX = "integ-test-";
    private static final String NO_ID_ATTRIBUTE = null;
    private static final String NO_TIMESTAMP_ATTRIBUTE = null;
    private final TestPubsubOptions pipelineOptions;
    @Nullable
    private PubsubClient pubsub = null;
    @Nullable
    private PubsubClient.TopicPath eventsTopicPath = null;
    @Nullable
    private PubsubClient.SubscriptionPath subscriptionPath = null;

    public static TestPubsub create() {
        TestPubsubOptions options = (TestPubsubOptions)TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class);
        return new TestPubsub(options);
    }

    private TestPubsub(TestPubsubOptions pipelineOptions) {
        this.pipelineOptions = pipelineOptions;
    }

    public Statement apply(final Statement base, final Description description) {
        return new Statement(){

            public void evaluate() throws Throwable {
                if (TestPubsub.this.pubsub != null) {
                    throw new AssertionError((Object)("Pubsub client was not shutdown in previous test. Topic path is'" + TestPubsub.this.eventsTopicPath + "'. Current test: " + description.getDisplayName()));
                }
                try {
                    TestPubsub.this.initializePubsub(description);
                    base.evaluate();
                }
                finally {
                    TestPubsub.this.tearDown();
                }
            }
        };
    }

    private void initializePubsub(Description description) throws IOException {
        this.pubsub = PubsubGrpcClient.FACTORY.newClient(NO_TIMESTAMP_ATTRIBUTE, NO_ID_ATTRIBUTE, this.pipelineOptions);
        PubsubClient.TopicPath eventsTopicPathTmp = PubsubClient.topicPathFromName(this.pipelineOptions.getProject(), TestPubsub.createTopicName(description, EVENTS_TOPIC_NAME));
        this.pubsub.createTopic(eventsTopicPathTmp);
        this.eventsTopicPath = eventsTopicPathTmp;
        this.subscriptionPath = this.pubsub.createRandomSubscription(PubsubClient.projectPathFromPath(String.format("projects/%s", this.pipelineOptions.getProject())), this.topicPath(), 10);
    }

    private void tearDown() throws IOException {
        if (this.pubsub == null) {
            return;
        }
        try {
            if (this.subscriptionPath != null) {
                this.pubsub.deleteSubscription(this.subscriptionPath);
            }
            if (this.eventsTopicPath != null) {
                this.pubsub.deleteTopic(this.eventsTopicPath);
            }
        }
        finally {
            this.pubsub.close();
            this.pubsub = null;
            this.eventsTopicPath = null;
            this.subscriptionPath = null;
        }
    }

    static String createTopicName(Description description, String name) throws IOException {
        StringBuilder topicName = new StringBuilder(TOPIC_PREFIX);
        if (description.getClassName() != null) {
            try {
                topicName.append(Class.forName(description.getClassName()).getSimpleName()).append("-");
            }
            catch (ClassNotFoundException e) {
                throw new RuntimeException(e);
            }
        }
        if (description.getMethodName() != null) {
            topicName.append(description.getMethodName()).append("-");
        }
        DATETIME_FORMAT.printTo(topicName, (ReadableInstant)Instant.now());
        return topicName.toString() + "-" + name + "-" + String.valueOf(ThreadLocalRandom.current().nextLong());
    }

    public PubsubClient.TopicPath topicPath() {
        return this.eventsTopicPath;
    }

    public PubsubClient.SubscriptionPath subscriptionPath() {
        return this.subscriptionPath;
    }

    private List<PubsubClient.SubscriptionPath> listSubscriptions(PubsubClient.ProjectPath projectPath, PubsubClient.TopicPath topicPath) throws IOException {
        return this.pubsub.listSubscriptions(projectPath, topicPath);
    }

    public void publish(List<PubsubMessage> messages) throws IOException {
        List<PubsubClient.OutgoingMessage> outgoingMessages = messages.stream().map(this::toOutgoingMessage).collect(Collectors.toList());
        this.pubsub.publish(this.eventsTopicPath, outgoingMessages);
    }

    public List<PubsubMessage> pull() throws IOException {
        return this.pull(100);
    }

    public List<PubsubMessage> pull(int maxBatchSize) throws IOException {
        List<PubsubClient.IncomingMessage> messages = this.pubsub.pull(0L, this.subscriptionPath, maxBatchSize, true);
        if (!messages.isEmpty()) {
            this.pubsub.acknowledge(this.subscriptionPath, (List)messages.stream().map(PubsubClient.IncomingMessage::ackId).collect(ImmutableList.toImmutableList()));
        }
        return (List)messages.stream().map(msg -> new PubsubMessage(msg.message().getData().toByteArray(), msg.message().getAttributesMap(), msg.recordId())).collect(ImmutableList.toImmutableList());
    }

    public List<PubsubMessage> waitForNMessages(int n, Duration timeoutDuration) throws IOException, InterruptedException {
        ArrayList<PubsubMessage> receivedMessages = new ArrayList<PubsubMessage>(n);
        DateTime startTime = new DateTime();
        int timeoutSeconds = timeoutDuration.toStandardSeconds().getSeconds();
        receivedMessages.addAll(this.pull(n - receivedMessages.size()));
        while (receivedMessages.size() < n && Seconds.secondsBetween((ReadableInstant)startTime, (ReadableInstant)new DateTime()).getSeconds() < timeoutSeconds) {
            Thread.sleep(1000L);
            receivedMessages.addAll(this.pull(n - receivedMessages.size()));
        }
        return receivedMessages;
    }

    public PollingAssertion assertThatTopicEventuallyReceives(Matcher<PubsubMessage> ... matchers) {
        return timeoutDuration -> MatcherAssert.assertThat(this.waitForNMessages(matchers.length, timeoutDuration), (Matcher)Matchers.containsInAnyOrder((Matcher[])matchers));
    }

    public void checkIfAnySubscriptionExists(String project, Duration timeoutDuration) throws InterruptedException, IllegalArgumentException, IOException, TimeoutException {
        if (timeoutDuration.getMillis() <= 0L) {
            throw new IllegalArgumentException(String.format("timeoutDuration should be greater than 0", new Object[0]));
        }
        DateTime startTime = new DateTime();
        int sizeOfSubscriptionList = 0;
        while (sizeOfSubscriptionList == 0 && Seconds.secondsBetween((ReadableInstant)new DateTime(), (ReadableInstant)startTime).getSeconds() < timeoutDuration.toStandardSeconds().getSeconds()) {
            Thread.sleep(1000L);
            sizeOfSubscriptionList = this.listSubscriptions(PubsubClient.projectPathFromPath(String.format("projects/%s", project)), this.topicPath()).size();
        }
        if (sizeOfSubscriptionList > 0) {
            return;
        }
        throw new TimeoutException("Timed out when checking if topics exist for " + this.topicPath());
    }

    private PubsubClient.OutgoingMessage toOutgoingMessage(PubsubMessage message) {
        return PubsubClient.OutgoingMessage.of(com.google.pubsub.v1.PubsubMessage.newBuilder().setData(ByteString.copyFrom((byte[])message.getPayload())).putAllAttributes(message.getAttributeMap()).build(), DateTime.now().getMillis(), null);
    }

    public static interface PollingAssertion {
        public void waitForUpTo(Duration var1) throws IOException, InterruptedException;
    }
}

