/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.common.collect.ImmutableSet;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubGrpcClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubOptions;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.ReadableInstant;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

public class TestPubsubSignal
implements TestRule {
    private static final String TOPIC_FORMAT = "projects/%s/topics/%s-result1";
    private static final String SUBSCRIPTION_FORMAT = "projects/%s/subscriptions/%s";
    private static final String NO_ID_ATTRIBUTE = null;
    private static final String NO_TIMESTAMP_ATTRIBUTE = null;
    PubsubClient pubsub;
    private TestPubsubOptions pipelineOptions;
    private String resultTopicPath;

    public static TestPubsubSignal create() {
        TestPubsubOptions options = (TestPubsubOptions)TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class);
        return new TestPubsubSignal(options);
    }

    private TestPubsubSignal(TestPubsubOptions pipelineOptions) {
        this.pipelineOptions = pipelineOptions;
    }

    public Statement apply(final Statement base, final Description description) {
        return new Statement(){

            public void evaluate() throws Throwable {
                if (TestPubsubSignal.this.pubsub != null) {
                    throw new AssertionError((Object)("Pubsub client was not shutdown in previous test. Topic path is'" + TestPubsubSignal.this.resultTopicPath + "'. Current test: " + description.getDisplayName()));
                }
                try {
                    TestPubsubSignal.this.initializePubsub(description);
                    base.evaluate();
                }
                finally {
                    TestPubsubSignal.this.tearDown();
                }
            }
        };
    }

    private void initializePubsub(Description description) throws IOException {
        this.pubsub = PubsubGrpcClient.FACTORY.newClient(NO_TIMESTAMP_ATTRIBUTE, NO_ID_ATTRIBUTE, this.pipelineOptions);
        String resultTopicPathTmp = String.format(TOPIC_FORMAT, this.pipelineOptions.getProject(), TestPubsub.createTopicName(description));
        this.pubsub.createTopic(new PubsubClient.TopicPath(resultTopicPathTmp));
        this.resultTopicPath = resultTopicPathTmp;
    }

    private void tearDown() throws IOException {
        if (this.pubsub == null) {
            return;
        }
        try {
            if (this.resultTopicPath != null) {
                this.pubsub.deleteTopic(new PubsubClient.TopicPath(this.resultTopicPath));
            }
        }
        finally {
            this.pubsub.close();
            this.pubsub = null;
            this.resultTopicPath = null;
        }
    }

    public <T> PTransform<PCollection<? extends T>, POutput> signalSuccessWhen(Coder<T> coder, SerializableFunction<Set<T>, Boolean> successPredicate) {
        return new PublishSuccessWhen<T>(coder, successPredicate, this.resultTopicPath);
    }

    public void waitForSuccess(Duration duration) throws IOException {
        PubsubClient.SubscriptionPath resultSubscriptionPath = new PubsubClient.SubscriptionPath(String.format(SUBSCRIPTION_FORMAT, this.pipelineOptions.getProject(), "subscription-" + String.valueOf(ThreadLocalRandom.current().nextLong())));
        this.pubsub.createSubscription(new PubsubClient.TopicPath(this.resultTopicPath), resultSubscriptionPath, (int)duration.getStandardSeconds());
        String result = this.pollForResultForDuration(resultSubscriptionPath, duration);
        if (!"SUCCESS".equals(result)) {
            throw new AssertionError((Object)result);
        }
    }

    private String pollForResultForDuration(PubsubClient.SubscriptionPath resultSubscriptionPath, Duration duration) throws IOException {
        List<PubsubClient.IncomingMessage> result = null;
        DateTime endPolling = DateTime.now().plus(duration.getMillis());
        while (true) {
            try {
                result = this.pubsub.pull(DateTime.now().getMillis(), resultSubscriptionPath, 1, false);
                this.pubsub.acknowledge(resultSubscriptionPath, result.stream().map(m -> m.ackId).collect(Collectors.toList()));
            }
            catch (StatusRuntimeException e) {
                System.out.println("Error while polling for result: " + e.getStatus());
                this.sleep(500L);
                if (DateTime.now().isBefore((ReadableInstant)endPolling)) continue;
            }
            break;
        }
        if (result == null) {
            throw new AssertionError((Object)("Did not receive success in " + duration.getStandardSeconds() + "s"));
        }
        return new String(result.get((int)0).elementBytes, StandardCharsets.UTF_8);
    }

    private void sleep(long t) {
        try {
            Thread.sleep(t);
        }
        catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }

    static class StatefulPredicateCheck<T>
    extends DoFn<KV<String, ? extends T>, String> {
        private SerializableFunction<Set<T>, Boolean> successPredicate;
        @DoFn.StateId(value="seenEvents")
        private final StateSpec<SetState<T>> seenEvents;

        StatefulPredicateCheck(Coder<T> coder, SerializableFunction<Set<T>, Boolean> successPredicate) {
            this.seenEvents = StateSpecs.set(coder);
            this.successPredicate = successPredicate;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context, @DoFn.StateId(value="seenEvents") SetState<T> seenEvents) {
            seenEvents.add(((KV)context.element()).getValue());
            ImmutableSet eventsSoFar = ImmutableSet.copyOf((Iterable)((Iterable)seenEvents.read()));
            try {
                if (((Boolean)this.successPredicate.apply((Object)eventsSoFar)).booleanValue()) {
                    context.output((Object)"SUCCESS");
                }
            }
            catch (Throwable e) {
                context.output((Object)("FAILURE: " + e.getMessage()));
            }
        }
    }

    static class PublishSuccessWhen<T>
    extends PTransform<PCollection<? extends T>, POutput> {
        private Coder<T> coder;
        private SerializableFunction<Set<T>, Boolean> successPredicate;
        private String resultTopicPath;

        PublishSuccessWhen(Coder<T> coder, SerializableFunction<Set<T>, Boolean> successPredicate, String resultTopicPath) {
            this.coder = coder;
            this.successPredicate = successPredicate;
            this.resultTopicPath = resultTopicPath;
        }

        public POutput expand(PCollection<? extends T> input) {
            return ((PCollection)((PCollection)((PCollection)input.apply((PTransform)Window.into((WindowFn)new GlobalWindows()))).apply((PTransform)WithKeys.of((Object)"dummyKey"))).apply("checkAllEventsForSuccess", (PTransform)ParDo.of(new StatefulPredicateCheck<T>(this.coder, this.successPredicate)))).apply("publishSuccess", PubsubIO.writeStrings().to(this.resultTopicPath));
        }
    }
}

