/*
 * Decompiled with CFR 0.152.
 */
package org.reactivestreams.tck;

import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.PublisherVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.reactivestreams.tck.support.Function;
import org.reactivestreams.tck.support.Optional;
import org.testng.Assert;
import org.testng.SkipException;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public abstract class PublisherVerification<T> {
    private final TestEnvironment env;
    private final long publisherReferenceGCTimeoutMillis;
    public static final String SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE = "Skipping because no error state Publisher provided, and the test requires it. Please implement PublisherVerification#createErrorStatePublisher to run this test.";
    public static final String SKIPPING_OPTIONAL_TEST_FAILED = "Skipping, because provided Publisher does not pass this *additional* verification.";

    public PublisherVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis) {
        this.env = env;
        this.publisherReferenceGCTimeoutMillis = publisherReferenceGCTimeoutMillis;
    }

    public abstract Publisher<T> createPublisher(long var1);

    public abstract Publisher<T> createErrorStatePublisher();

    public long maxElementsFromPublisher() {
        return 0x7FFFFFFFFFFFFFFEL;
    }

    public boolean skipStochasticTests() {
        return false;
    }

    public long boundedDepthOfOnNextAndRequestRecursion() {
        return 1L;
    }

    @BeforeMethod
    public void setUp() throws Exception {
        this.env.clearAsyncErrors();
    }

    @Test
    public void createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable {
        this.activePublisherTest(1L, true, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws InterruptedException {
                TestEnvironment.ManualSubscriber sub = PublisherVerification.this.env.newManualSubscriber(pub);
                Assert.assertTrue((boolean)this.requestNextElementOrEndOfStream(pub, sub).isDefined(), (String)String.format("Publisher %s produced no elements", pub));
                sub.requestEndOfStream();
            }

            Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, TestEnvironment.ManualSubscriber<T> sub) throws InterruptedException {
                return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub));
            }
        });
    }

    @Test
    public void createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable {
        this.activePublisherTest(3L, true, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws InterruptedException {
                TestEnvironment.ManualSubscriber sub = PublisherVerification.this.env.newManualSubscriber(pub);
                Assert.assertTrue((boolean)this.requestNextElementOrEndOfStream(pub, sub).isDefined(), (String)String.format("Publisher %s produced no elements", pub));
                Assert.assertTrue((boolean)this.requestNextElementOrEndOfStream(pub, sub).isDefined(), (String)String.format("Publisher %s produced only 1 element", pub));
                Assert.assertTrue((boolean)this.requestNextElementOrEndOfStream(pub, sub).isDefined(), (String)String.format("Publisher %s produced only 2 elements", pub));
                sub.requestEndOfStream();
            }

            Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, TestEnvironment.ManualSubscriber<T> sub) throws InterruptedException {
                return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub));
            }
        });
    }

    @Test
    public void validate_maxElementsFromPublisher() throws Exception {
        Assert.assertTrue((this.maxElementsFromPublisher() > 0L ? 1 : 0) != 0, (String)"maxElementsFromPublisher MUST return a number > 0");
    }

    @Test
    public void validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception {
        Assert.assertTrue((this.boundedDepthOfOnNextAndRequestRecursion() >= 1L ? 1 : 0) != 0, (String)"boundedDepthOfOnNextAndRequestRecursion must return a number >= 1");
    }

    @Test
    public void spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable {
        this.activePublisherTest(5L, false, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws InterruptedException {
                TestEnvironment.ManualSubscriber sub = PublisherVerification.this.env.newManualSubscriber(pub);
                sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub));
                sub.request(1L);
                sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub));
                sub.expectNone(String.format("Publisher %s produced unrequested: ", pub));
                sub.request(1L);
                sub.request(2L);
                sub.nextElements(3L, PublisherVerification.this.env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub));
                sub.expectNone(String.format("Publisher %sproduced unrequested ", pub));
            }
        });
    }

    @Test
    public void spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable {
        int elements = 3;
        int requested = 10;
        this.activePublisherTest(3L, true, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                TestEnvironment.ManualSubscriber sub = PublisherVerification.this.env.newManualSubscriber(pub);
                sub.request(10L);
                sub.nextElements(3L);
                sub.expectCompletion();
            }
        });
    }

    @Test
    public void spec103_mustSignalOnMethodsSequentially() throws Throwable {
        int iterations = 100;
        int elements = 10;
        this.stochasticTest(100, new Function<Integer, Void>(){

            @Override
            public Void apply(Integer runNumber) throws Throwable {
                PublisherVerification.this.activePublisherTest(10L, true, new PublisherTestRun<T>(){

                    @Override
                    public void run(Publisher<T> pub) throws Throwable {
                        final TestEnvironment.Latch completionLatch = new TestEnvironment.Latch(PublisherVerification.this.env);
                        pub.subscribe(new Subscriber<T>(){
                            private Subscription subs;
                            private long gotElements = 0L;
                            private ConcurrentAccessBarrier concurrentAccessBarrier = new ConcurrentAccessBarrier();

                            public void onSubscribe(Subscription s) {
                                String signal = "onSubscribe()";
                                this.concurrentAccessBarrier.enterSignal("onSubscribe()");
                                this.subs = s;
                                this.subs.request(1L);
                                this.concurrentAccessBarrier.leaveSignal("onSubscribe()");
                            }

                            public void onNext(T ignore) {
                                String signal = String.format("onNext(%s)", ignore);
                                this.concurrentAccessBarrier.enterSignal(signal);
                                ++this.gotElements;
                                if (this.gotElements <= 10L) {
                                    this.subs.request(1L);
                                }
                                this.concurrentAccessBarrier.leaveSignal(signal);
                            }

                            public void onError(Throwable t) {
                                String signal = String.format("onError(%s)", t.getMessage());
                                this.concurrentAccessBarrier.enterSignal(signal);
                                this.concurrentAccessBarrier.leaveSignal(signal);
                            }

                            public void onComplete() {
                                String signal = "onComplete()";
                                this.concurrentAccessBarrier.enterSignal("onComplete()");
                                this.concurrentAccessBarrier.leaveSignal("onComplete()");
                                completionLatch.close();
                            }

                            final class ConcurrentAccessBarrier {
                                private AtomicReference<Thread> currentlySignallingThread = new AtomicReference<Object>(null);
                                private volatile String previousSignal = null;

                                ConcurrentAccessBarrier() {
                                }

                                public void enterSignal(String signalName) {
                                    if (!this.currentlySignallingThread.compareAndSet(null, Thread.currentThread()) && !this.isSynchronousSignal()) {
                                        PublisherVerification.this.env.flop(String.format("Illegal concurrent access detected (entering critical section)! %s emited %s signal, before %s finished its %s signal.", Thread.currentThread(), signalName, this.currentlySignallingThread.get(), this.previousSignal));
                                    }
                                    this.previousSignal = signalName;
                                }

                                public void leaveSignal(String signalName) {
                                    this.currentlySignallingThread.set(null);
                                    this.previousSignal = signalName;
                                }

                                private boolean isSynchronousSignal() {
                                    return this.previousSignal != null && Thread.currentThread().equals(this.currentlySignallingThread.get());
                                }
                            }
                        });
                        completionLatch.expectClose(10L * PublisherVerification.this.env.defaultTimeoutMillis(), "Expected 10 elements to be drained");
                    }
                });
                return null;
            }
        });
    }

    @Test
    public void spec104_mustSignalOnErrorWhenFails() throws Throwable {
        try {
            this.errorPublisherTest(new PublisherTestRun<T>(){

                @Override
                public void run(final Publisher<T> pub) throws InterruptedException {
                    final TestEnvironment.Latch latch = new TestEnvironment.Latch(PublisherVerification.this.env);
                    pub.subscribe((Subscriber)new TestEnvironment.TestSubscriber<T>(PublisherVerification.this.env){

                        @Override
                        public void onError(Throwable cause) {
                            latch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub));
                            latch.close();
                        }
                    });
                    latch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub));
                    PublisherVerification.this.env.verifyNoAsyncErrors(PublisherVerification.this.env.defaultTimeoutMillis());
                }
            });
        }
        catch (SkipException se) {
            throw se;
        }
        catch (Throwable ex) {
            throw new RuntimeException(String.format("Publisher threw exception (%s) instead of signalling error via onError!", ex.getMessage()), ex);
        }
    }

    @Test
    public void spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable {
        this.activePublisherTest(3L, true, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                TestEnvironment.ManualSubscriber sub = PublisherVerification.this.env.newManualSubscriber(pub);
                sub.requestNextElement();
                sub.requestNextElement();
                sub.requestNextElement();
                sub.requestEndOfStream();
                sub.expectNone();
            }
        });
    }

    @Test
    public void spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable {
        this.notVerified();
    }

    @Test
    public void spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable {
        this.activePublisherTest(1L, true, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                TestEnvironment.ManualSubscriber sub = PublisherVerification.this.env.newManualSubscriber(pub);
                sub.request(10L);
                sub.nextElement();
                sub.expectCompletion();
                sub.request(10L);
                sub.expectNone();
            }
        });
    }

    @Test
    public void spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable {
        this.notVerified();
    }

    @Test
    public void spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable {
        this.notVerified();
    }

    @Test
    public void spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
        this.notVerified();
    }

    @Test
    public void spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
        this.notVerified();
    }

    @Test
    public void spec111_maySupportMultiSubscribe() throws Throwable {
        this.optionalActivePublisherTest(1L, false, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                TestEnvironment.ManualSubscriber sub1 = PublisherVerification.this.env.newManualSubscriber(pub);
                TestEnvironment.ManualSubscriber sub2 = PublisherVerification.this.env.newManualSubscriber(pub);
                PublisherVerification.this.env.verifyNoAsyncErrors();
            }
        });
    }

    @Test
    public void spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
        this.errorPublisherTest(new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                final TestEnvironment.Latch onErrorLatch = new TestEnvironment.Latch(PublisherVerification.this.env);
                TestEnvironment.ManualSubscriberWithSubscriptionSupport sub = new TestEnvironment.ManualSubscriberWithSubscriptionSupport<T>(PublisherVerification.this.env){

                    @Override
                    public void onError(Throwable cause) {
                        onErrorLatch.assertOpen("Only one onError call expected");
                        onErrorLatch.close();
                    }

                    @Override
                    public void onSubscribe(Subscription subs) {
                        this.env.flop("onSubscribe should not be called if Publisher is unable to subscribe a Subscriber");
                    }
                };
                pub.subscribe((Subscriber)sub);
                onErrorLatch.assertClosed("Should have received onError");
                PublisherVerification.this.env.verifyNoAsyncErrors();
            }
        });
    }

    @Test
    public void spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
        this.optionalActivePublisherTest(5L, true, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws InterruptedException {
                TestEnvironment.ManualSubscriber sub1 = PublisherVerification.this.env.newManualSubscriber(pub);
                TestEnvironment.ManualSubscriber sub2 = PublisherVerification.this.env.newManualSubscriber(pub);
                TestEnvironment.ManualSubscriber sub3 = PublisherVerification.this.env.newManualSubscriber(pub);
                sub1.request(1L);
                Object x1 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
                sub2.request(2L);
                List y1 = sub2.nextElements(2L, String.format("Publisher %s did not produce the requested 2 elements on 2nd subscriber", pub));
                sub1.request(1L);
                Object x2 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
                sub3.request(3L);
                List z1 = sub3.nextElements(3L, String.format("Publisher %s did not produce the requested 3 elements on 3rd subscriber", pub));
                sub3.request(1L);
                Object z2 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub));
                sub3.request(1L);
                Object z3 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub));
                sub3.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 3rd subscriber", pub));
                sub2.request(3L);
                List y2 = sub2.nextElements(3L, String.format("Publisher %s did not produce the requested 3 elements on 2nd subscriber", pub));
                sub2.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 2nd subscriber", pub));
                sub1.request(2L);
                List x3 = sub1.nextElements(2L, String.format("Publisher %s did not produce the requested 2 elements on 1st subscriber", pub));
                sub1.request(1L);
                Object x4 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub));
                sub1.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 1st subscriber", pub));
                ArrayList<Object> r = new ArrayList<Object>(Arrays.asList(x1, x2));
                r.addAll(x3);
                r.addAll(Collections.singleton(x4));
                ArrayList check1 = new ArrayList(y1);
                check1.addAll(y2);
                ArrayList check2 = new ArrayList(z1);
                check2.add(z2);
                check2.add(z3);
                Assert.assertEquals(r, check1, (String)String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 2", pub));
                Assert.assertEquals(r, check2, (String)String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 3", pub));
            }
        });
    }

    @Test
    public void spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
        this.optionalActivePublisherTest(3L, false, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                TestEnvironment.ManualSubscriber sub1 = PublisherVerification.this.env.newManualSubscriber(pub);
                TestEnvironment.ManualSubscriber sub2 = PublisherVerification.this.env.newManualSubscriber(pub);
                TestEnvironment.ManualSubscriber sub3 = PublisherVerification.this.env.newManualSubscriber(pub);
                ArrayList received1 = new ArrayList();
                ArrayList received2 = new ArrayList();
                ArrayList received3 = new ArrayList();
                sub1.request(4L);
                sub2.request(4L);
                sub3.request(4L);
                received1.addAll(sub1.nextElements(3L));
                received2.addAll(sub2.nextElements(3L));
                received3.addAll(sub3.nextElements(3L));
                Assert.assertEquals(received1, received2, (String)String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers", new Object[0]));
                Assert.assertEquals(received2, received3, (String)String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers", new Object[0]));
            }
        });
    }

    @Test
    public void spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
        this.optionalActivePublisherTest(3L, true, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                TestEnvironment.ManualSubscriber sub1 = PublisherVerification.this.env.newManualSubscriber(pub);
                TestEnvironment.ManualSubscriber sub2 = PublisherVerification.this.env.newManualSubscriber(pub);
                TestEnvironment.ManualSubscriber sub3 = PublisherVerification.this.env.newManualSubscriber(pub);
                ArrayList received1 = new ArrayList();
                ArrayList received2 = new ArrayList();
                ArrayList received3 = new ArrayList();
                sub1.request(4L);
                sub2.request(4L);
                sub3.request(4L);
                received1.addAll(sub1.nextElements(3L));
                received2.addAll(sub2.nextElements(3L));
                received3.addAll(sub3.nextElements(3L));
                sub1.expectCompletion();
                sub2.expectCompletion();
                sub3.expectCompletion();
                Assert.assertEquals(received1, received2, (String)String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers", new Object[0]));
                Assert.assertEquals(received2, received3, (String)String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers", new Object[0]));
            }
        });
    }

    @Test
    public void spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable {
        this.activePublisherTest(6L, false, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                TestEnvironment.ManualSubscriber sub = new TestEnvironment.ManualSubscriber<T>(PublisherVerification.this.env){

                    @Override
                    public void onSubscribe(Subscription subs) {
                        this.subscription.completeImmediatly(subs);
                        subs.request(1L);
                        subs.request(1L);
                        subs.request(1L);
                    }

                    @Override
                    public void onNext(T element) {
                        Subscription subs = (Subscription)this.subscription.value();
                        subs.request(1L);
                    }
                };
                PublisherVerification.this.env.subscribe(pub, sub);
                long delay = PublisherVerification.this.env.defaultTimeoutMillis();
                PublisherVerification.this.env.verifyNoAsyncErrors(delay);
            }
        });
    }

    @Test
    public void spec303_mustNotAllowUnboundedRecursion() throws Throwable {
        long oneMoreThanBoundedLimit = this.boundedDepthOfOnNextAndRequestRecursion() + 1L;
        this.activePublisherTest(oneMoreThanBoundedLimit, false, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                final ThreadLocal<Long> stackDepthCounter = new ThreadLocal<Long>(){

                    @Override
                    protected Long initialValue() {
                        return 0L;
                    }
                };
                TestEnvironment.ManualSubscriberWithSubscriptionSupport sub = new TestEnvironment.ManualSubscriberWithSubscriptionSupport<T>(PublisherVerification.this.env){

                    @Override
                    public void onNext(T element) {
                        stackDepthCounter.set((Long)stackDepthCounter.get() + 1L);
                        super.onNext(element);
                        Long callsUntilNow = (Long)stackDepthCounter.get();
                        if (callsUntilNow > PublisherVerification.this.boundedDepthOfOnNextAndRequestRecursion()) {
                            this.env.flop(String.format("Got %d onNext calls within thread: %s, yet expected recursive bound was %d", callsUntilNow, Thread.currentThread(), PublisherVerification.this.boundedDepthOfOnNextAndRequestRecursion()));
                            return;
                        }
                        ((Subscription)this.subscription.value()).request(1L);
                        stackDepthCounter.set((Long)stackDepthCounter.get() - 1L);
                    }
                };
                PublisherVerification.this.env.subscribe(pub, sub);
                sub.request(1L);
                sub.nextElementOrEndOfStream();
                PublisherVerification.this.env.verifyNoAsyncErrors();
            }
        });
    }

    @Test
    public void spec304_requestShouldNotPerformHeavyComputations() throws Exception {
        this.notVerified();
    }

    @Test
    public void spec305_cancelMustNotSynchronouslyPerformHeavyCompuatation() throws Exception {
        this.notVerified();
    }

    @Test
    public void spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable {
        this.activePublisherTest(3L, false, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                TestEnvironment.ManualSubscriberWithSubscriptionSupport sub = new TestEnvironment.ManualSubscriberWithSubscriptionSupport<T>(PublisherVerification.this.env){

                    @Override
                    public void cancel() {
                        if (this.subscription.isCompleted()) {
                            ((Subscription)this.subscription.value()).cancel();
                        } else {
                            this.env.flop("Cannot cancel a subscription before having received it");
                        }
                    }
                };
                PublisherVerification.this.env.subscribe(pub, sub);
                sub.cancel();
                sub.request(1L);
                sub.request(1L);
                sub.request(1L);
                sub.expectNone();
                PublisherVerification.this.env.verifyNoAsyncErrors();
            }
        });
    }

    @Test
    public void spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable {
        this.activePublisherTest(1L, false, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                TestEnvironment.ManualSubscriber sub = PublisherVerification.this.env.newManualSubscriber(pub);
                Subscription subs = (Subscription)sub.subscription.value();
                subs.cancel();
                subs.cancel();
                subs.cancel();
                sub.expectNone();
                PublisherVerification.this.env.verifyNoAsyncErrors();
            }
        });
    }

    @Test
    public void spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable {
        this.activePublisherTest(10L, false, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                TestEnvironment.ManualSubscriber sub = PublisherVerification.this.env.newManualSubscriber(pub);
                sub.request(0L);
                sub.expectErrorWithMessage(IllegalArgumentException.class, "3.9");
            }
        });
    }

    @Test
    public void spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable {
        this.activePublisherTest(10L, false, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                TestEnvironment.ManualSubscriber sub = PublisherVerification.this.env.newManualSubscriber(pub);
                Random r = new Random();
                sub.request(-r.nextInt(Integer.MAX_VALUE));
                sub.expectErrorWithMessage(IllegalArgumentException.class, "3.9");
            }
        });
    }

    @Test
    public void spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
        int publisherElements = 20;
        int demand1 = 10;
        int demand2 = 5;
        int totalDemand = 15;
        this.activePublisherTest(20L, false, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                boolean stillBeingSignalled;
                TestEnvironment.ManualSubscriber sub = PublisherVerification.this.env.newManualSubscriber(pub);
                sub.request(10L);
                sub.request(5L);
                sub.nextElement();
                sub.cancel();
                int onNextsSignalled = 1;
                do {
                    sub.expectNone();
                    Throwable error = PublisherVerification.this.env.dropAsyncError();
                    if (error == null) {
                        stillBeingSignalled = false;
                    } else {
                        ++onNextsSignalled;
                        stillBeingSignalled = true;
                    }
                    Assert.assertTrue((onNextsSignalled <= 15 ? 1 : 0) != 0, (String)String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d", onNextsSignalled, 15));
                } while (stillBeingSignalled);
            }
        });
        this.env.verifyNoAsyncErrors();
    }

    @Test
    public void spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
        final ReferenceQueue queue = new ReferenceQueue();
        final Function run = new Function<Publisher<T>, WeakReference<TestEnvironment.ManualSubscriber<T>>>(){

            @Override
            public WeakReference<TestEnvironment.ManualSubscriber<T>> apply(Publisher<T> pub) throws Exception {
                TestEnvironment.ManualSubscriber sub = PublisherVerification.this.env.newManualSubscriber(pub);
                WeakReference ref = new WeakReference(sub, queue);
                sub.request(1L);
                sub.nextElement();
                sub.cancel();
                return ref;
            }
        };
        this.activePublisherTest(3L, false, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                WeakReference ref = (WeakReference)run.apply(pub);
                Thread.sleep(PublisherVerification.this.publisherReferenceGCTimeoutMillis);
                System.gc();
                if (!ref.equals(queue.remove(100L))) {
                    PublisherVerification.this.env.flop(String.format("Publisher %s did not drop reference to test subscriber after subscription cancellation", pub));
                }
                PublisherVerification.this.env.verifyNoAsyncErrors();
            }
        });
    }

    @Test
    public void spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable {
        int totalElements = 3;
        this.activePublisherTest(3L, true, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                TestEnvironment.ManualSubscriber sub = PublisherVerification.this.env.newManualSubscriber(pub);
                sub.request(Long.MAX_VALUE);
                sub.nextElements(3L);
                sub.expectCompletion();
                PublisherVerification.this.env.verifyNoAsyncErrors();
            }
        });
    }

    @Test
    public void spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable {
        int totalElements = 3;
        this.activePublisherTest(3L, true, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                TestEnvironment.ManualSubscriber sub = PublisherVerification.this.env.newManualSubscriber(pub);
                sub.request(0x3FFFFFFFFFFFFFFFL);
                sub.request(0x3FFFFFFFFFFFFFFFL);
                sub.request(1L);
                sub.nextElements(3L);
                sub.expectCompletion();
                PublisherVerification.this.env.verifyNoAsyncErrors();
            }
        });
    }

    @Test
    public void spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
        this.activePublisherTest(Integer.MAX_VALUE, false, new PublisherTestRun<T>(){

            @Override
            public void run(Publisher<T> pub) throws Throwable {
                TestEnvironment.BlackholeSubscriberWithSubscriptionSupport sub = new TestEnvironment.BlackholeSubscriberWithSubscriptionSupport<T>(PublisherVerification.this.env){
                    int callsCounter;
                    {
                        this.callsCounter = 10;
                    }

                    @Override
                    public void onNext(T element) {
                        this.env.debug(String.format("%s::onNext(%s)", this, element));
                        if (this.subscription.isCompleted()) {
                            if (this.callsCounter > 0) {
                                ((Subscription)this.subscription.value()).request(0x7FFFFFFFFFFFFFFEL);
                                --this.callsCounter;
                            }
                        } else {
                            this.env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
                        }
                    }
                };
                PublisherVerification.this.env.subscribe(pub, sub, PublisherVerification.this.env.defaultTimeoutMillis());
                sub.request(1L);
                sub.expectErrorWithMessage(IllegalStateException.class, "3.17");
                PublisherVerification.this.env.verifyNoAsyncErrors(PublisherVerification.this.env.defaultTimeoutMillis());
            }
        });
    }

    public void activePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable {
        if (elements > this.maxElementsFromPublisher()) {
            throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, this.maxElementsFromPublisher()));
        }
        if (completionSignalRequired && this.maxElementsFromPublisher() == Long.MAX_VALUE) {
            throw new SkipException("Unable to run this test, as it requires an onComplete signal, which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)");
        }
        Publisher<T> pub = this.createPublisher(elements);
        body.run(pub);
        this.env.verifyNoAsyncErrors();
    }

    public void optionalActivePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable {
        if (elements > this.maxElementsFromPublisher()) {
            throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, this.maxElementsFromPublisher()));
        }
        if (completionSignalRequired && this.maxElementsFromPublisher() == Long.MAX_VALUE) {
            throw new SkipException("Unable to run this test, as it requires an onComplete signal, which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)");
        }
        Publisher<T> pub = this.createPublisher(elements);
        String skipMessage = "Skipped because tested publisher does NOT implement this OPTIONAL requirement.";
        try {
            this.potentiallyPendingTest(pub, body);
        }
        catch (Exception ex) {
            this.notVerified("Skipped because tested publisher does NOT implement this OPTIONAL requirement.");
        }
        catch (AssertionError ex) {
            this.notVerified("Skipped because tested publisher does NOT implement this OPTIONAL requirement. Reason for skipping was: " + ((Throwable)((Object)ex)).getMessage());
        }
    }

    public void errorPublisherTest(PublisherTestRun<T> body) throws Throwable {
        this.potentiallyPendingTest(this.createErrorStatePublisher(), body, SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE);
    }

    public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body) throws Throwable {
        this.potentiallyPendingTest(pub, body, SKIPPING_OPTIONAL_TEST_FAILED);
    }

    public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body, String message) throws Throwable {
        if (pub == null) {
            throw new SkipException(message);
        }
        body.run(pub);
    }

    public void stochasticTest(int n, Function<Integer, Void> body) throws Throwable {
        if (this.skipStochasticTests()) {
            this.notVerified("Skipping @Stochastic test because `skipStochasticTests()` returned `true`!");
        }
        for (int i = 0; i < n; ++i) {
            body.apply(i);
        }
    }

    public void notVerified() {
        throw new SkipException("Not verified by this TCK.");
    }

    public long publisherUnableToSignalOnComplete() {
        return Long.MAX_VALUE;
    }

    public void notVerified(String message) {
        throw new SkipException(message);
    }

    public static interface PublisherTestRun<T> {
        public void run(Publisher<T> var1) throws Throwable;
    }
}

