/*
 * Decompiled with CFR 0.152.
 */
package org.reactivestreams.tck;

import java.util.HashSet;
import java.util.Set;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.PublisherVerification;
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.reactivestreams.tck.support.Function;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public abstract class IdentityProcessorVerification<T> {
    private final TestEnvironment env;
    private final SubscriberWhiteboxVerification<T> subscriberVerification;
    private final PublisherVerification<T> publisherVerification;
    private final int processorBufferSize;

    public IdentityProcessorVerification(TestEnvironment env, long publisherShutdownTimeoutMillis) {
        this(env, publisherShutdownTimeoutMillis, 16);
    }

    public IdentityProcessorVerification(TestEnvironment env, long publisherShutdownTimeoutMillis, int processorBufferSize) {
        this.env = env;
        this.processorBufferSize = processorBufferSize;
        this.subscriberVerification = new SubscriberWhiteboxVerification<T>(env){

            @Override
            public Subscriber<T> createSubscriber(SubscriberWhiteboxVerification.WhiteboxSubscriberProbe<T> probe) {
                return IdentityProcessorVerification.this.createSubscriber(probe);
            }

            @Override
            public Publisher<T> createHelperPublisher(long elements) {
                return IdentityProcessorVerification.this.createHelperPublisher(elements);
            }
        };
        this.publisherVerification = new PublisherVerification<T>(env, publisherShutdownTimeoutMillis){

            @Override
            public Publisher<T> createPublisher(long elements) {
                return IdentityProcessorVerification.this.createPublisher(elements);
            }

            @Override
            public Publisher<T> createErrorStatePublisher() {
                return IdentityProcessorVerification.this.createErrorStatePublisher();
            }

            @Override
            public long maxElementsFromPublisher() {
                return IdentityProcessorVerification.this.maxElementsFromPublisher();
            }

            @Override
            public long boundedDepthOfOnNextAndRequestRecursion() {
                return IdentityProcessorVerification.this.boundedDepthOfOnNextAndRequestRecursion();
            }

            @Override
            public boolean skipStochasticTests() {
                return IdentityProcessorVerification.this.skipStochasticTests();
            }
        };
    }

    public abstract Processor<T, T> createIdentityProcessor(int var1);

    public abstract Publisher<T> createHelperPublisher(long var1);

    public abstract Publisher<T> createErrorStatePublisher();

    public long maxElementsFromPublisher() {
        return 0x7FFFFFFFFFFFFFFEL;
    }

    public long boundedDepthOfOnNextAndRequestRecursion() {
        return 1L;
    }

    public boolean skipStochasticTests() {
        return false;
    }

    public long maxSupportedSubscribers() {
        return Long.MAX_VALUE;
    }

    @BeforeMethod
    public void setUp() throws Exception {
        this.publisherVerification.setUp();
        this.subscriberVerification.setUp();
    }

    public Publisher<T> createPublisher(long elements) {
        Processor<T, T> processor = this.createIdentityProcessor(this.processorBufferSize);
        Publisher<T> pub = this.createHelperPublisher(elements);
        pub.subscribe(processor);
        return processor;
    }

    @Test
    public void validate_maxElementsFromPublisher() throws Exception {
        this.publisherVerification.validate_maxElementsFromPublisher();
    }

    @Test
    public void validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception {
        this.publisherVerification.validate_boundedDepthOfOnNextAndRequestRecursion();
    }

    @Test
    public void createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable {
        this.publisherVerification.createPublisher1MustProduceAStreamOfExactly1Element();
    }

    @Test
    public void createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable {
        this.publisherVerification.createPublisher3MustProduceAStreamOfExactly3Elements();
    }

    @Test
    public void spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable {
        this.publisherVerification.spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements();
    }

    @Test
    public void spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable {
        this.publisherVerification.spec102_maySignalLessThanRequestedAndTerminateSubscription();
    }

    @Test
    public void spec103_mustSignalOnMethodsSequentially() throws Throwable {
        this.publisherVerification.spec103_mustSignalOnMethodsSequentially();
    }

    @Test
    public void spec104_mustSignalOnErrorWhenFails() throws Throwable {
        this.publisherVerification.spec104_mustSignalOnErrorWhenFails();
    }

    @Test
    public void spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable {
        this.publisherVerification.spec105_mustSignalOnCompleteWhenFiniteStreamTerminates();
    }

    @Test
    public void spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable {
        this.publisherVerification.spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled();
    }

    @Test
    public void spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable {
        this.publisherVerification.spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled();
    }

    @Test
    public void spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable {
        this.publisherVerification.spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled();
    }

    @Test
    public void spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable {
        this.publisherVerification.spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals();
    }

    @Test
    public void spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
        this.publisherVerification.spec109_subscribeShouldNotThrowNonFatalThrowable();
    }

    @Test
    public void spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
        this.publisherVerification.spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice();
    }

    @Test
    public void spec111_maySupportMultiSubscribe() throws Throwable {
        this.publisherVerification.spec111_maySupportMultiSubscribe();
    }

    @Test
    public void spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
        this.publisherVerification.spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe();
    }

    @Test
    public void spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
        this.publisherVerification.spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne();
    }

    @Test
    public void spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
        this.publisherVerification.spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
    }

    @Test
    public void spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
        this.publisherVerification.spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected();
    }

    @Test
    public void spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable {
        this.publisherVerification.spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe();
    }

    @Test
    public void spec303_mustNotAllowUnboundedRecursion() throws Throwable {
        this.publisherVerification.spec303_mustNotAllowUnboundedRecursion();
    }

    @Test
    public void spec304_requestShouldNotPerformHeavyComputations() throws Exception {
        this.publisherVerification.spec304_requestShouldNotPerformHeavyComputations();
    }

    @Test
    public void spec305_cancelMustNotSynchronouslyPerformHeavyCompuatation() throws Exception {
        this.publisherVerification.spec305_cancelMustNotSynchronouslyPerformHeavyCompuatation();
    }

    @Test
    public void spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable {
        this.publisherVerification.spec306_afterSubscriptionIsCancelledRequestMustBeNops();
    }

    @Test
    public void spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable {
        this.publisherVerification.spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops();
    }

    @Test
    public void spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable {
        this.publisherVerification.spec309_requestZeroMustSignalIllegalArgumentException();
    }

    @Test
    public void spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable {
        this.publisherVerification.spec309_requestNegativeNumberMustSignalIllegalArgumentException();
    }

    @Test
    public void spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
        this.publisherVerification.spec312_cancelMustMakeThePublisherToEventuallyStopSignaling();
    }

    @Test
    public void spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
        this.publisherVerification.spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber();
    }

    @Test
    public void spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable {
        this.publisherVerification.spec317_mustSupportAPendingElementCountUpToLongMaxValue();
    }

    @Test
    public void spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable {
        this.publisherVerification.spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue();
    }

    @Test
    public void spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
        this.publisherVerification.spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue();
    }

    @Test
    public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable {
        this.optionalMultipleSubscribersTest(2L, new Function<Long, TestSetup>(){

            @Override
            public TestSetup apply(Long aLong) throws Throwable {
                return new TestSetup(IdentityProcessorVerification.this.env, IdentityProcessorVerification.this.processorBufferSize){
                    {
                        ManualSubscriberWithErrorCollection sub1 = new ManualSubscriberWithErrorCollection(this.env);
                        this.env.subscribe(this.processor, sub1);
                        ManualSubscriberWithErrorCollection sub2 = new ManualSubscriberWithErrorCollection(this.env);
                        this.env.subscribe(this.processor, sub2);
                        sub1.request(1L);
                        this.expectRequest();
                        Object x = this.sendNextTFromUpstream();
                        this.expectNextElement(sub1, x);
                        sub1.request(1L);
                        RuntimeException ex = new RuntimeException("Test exception");
                        this.sendError(ex);
                        sub1.expectError(ex);
                        sub2.expectError(ex);
                        this.env.verifyNoAsyncErrors();
                    }
                };
            }
        });
    }

    public Subscriber<T> createSubscriber(final SubscriberWhiteboxVerification.WhiteboxSubscriberProbe<T> probe) {
        Processor<T, T> processor = this.createIdentityProcessor(this.processorBufferSize);
        processor.subscribe(new Subscriber<T>(){
            private final TestEnvironment.Promise<Subscription> subs;
            {
                this.subs = new TestEnvironment.Promise(IdentityProcessorVerification.this.env);
            }

            public void onSubscribe(final Subscription subscription) {
                IdentityProcessorVerification.this.env.debug(String.format("whiteboxSubscriber::onSubscribe(%s)", subscription));
                if (this.subs.isCompleted()) {
                    subscription.cancel();
                }
                probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet(){

                    @Override
                    public void triggerRequest(long elements) {
                        subscription.request(elements);
                    }

                    @Override
                    public void signalCancel() {
                        subscription.cancel();
                    }
                });
            }

            public void onNext(T element) {
                IdentityProcessorVerification.this.env.debug(String.format("whiteboxSubscriber::onNext(%s)", element));
                probe.registerOnNext(element);
            }

            public void onComplete() {
                IdentityProcessorVerification.this.env.debug("whiteboxSubscriber::onComplete()");
                probe.registerOnComplete();
            }

            public void onError(Throwable cause) {
                IdentityProcessorVerification.this.env.debug(String.format("whiteboxSubscriber::onError(%s)", cause));
                probe.registerOnError(cause);
            }
        });
        return processor;
    }

    @Test
    public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception {
        new TestSetup(this.env, this.processorBufferSize){
            {
                ManualSubscriberWithErrorCollection sub = new ManualSubscriberWithErrorCollection(this.env);
                this.env.subscribe(this.processor, sub);
                RuntimeException ex = new RuntimeException("Test exception");
                this.sendError(ex);
                sub.expectError(ex);
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    @Test
    public void exerciseWhiteboxHappyPath() throws Throwable {
        this.subscriberVerification.exerciseWhiteboxHappyPath();
    }

    @Test
    public void spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable {
        this.subscriberVerification.spec201_mustSignalDemandViaSubscriptionRequest();
    }

    @Test
    public void spec202_shouldAsynchronouslyDispatch() throws Exception {
        this.subscriberVerification.spec202_shouldAsynchronouslyDispatch();
    }

    @Test
    public void spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
        this.subscriberVerification.spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete();
    }

    @Test
    public void spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
        this.subscriberVerification.spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError();
    }

    @Test
    public void spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
        this.subscriberVerification.spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError();
    }

    @Test
    public void spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception {
        this.subscriberVerification.spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal();
    }

    @Test
    public void spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
        this.subscriberVerification.spec206_mustCallSubscriptionCancelIfItIsNoLongerValid();
    }

    @Test
    public void spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception {
        this.subscriberVerification.spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization();
    }

    @Test
    public void spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
        this.subscriberVerification.spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel();
    }

    @Test
    public void spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
        this.subscriberVerification.spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
    }

    @Test
    public void spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
        this.subscriberVerification.spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall();
    }

    @Test
    public void spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
        this.subscriberVerification.spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall();
    }

    @Test
    public void spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
        this.subscriberVerification.spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall();
    }

    @Test
    public void spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
        this.subscriberVerification.spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents();
    }

    @Test
    public void spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable {
        this.subscriberVerification.spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation();
    }

    @Test
    public void spec213_failingOnSignalInvocation() throws Exception {
        this.subscriberVerification.spec213_failingOnSignalInvocation();
    }

    @Test
    public void spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception {
        this.subscriberVerification.spec301_mustNotBeCalledOutsideSubscriberContext();
    }

    @Test
    public void spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
        this.subscriberVerification.spec308_requestMustRegisterGivenNumberElementsToBeProduced();
    }

    @Test
    public void spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
        this.subscriberVerification.spec310_requestMaySynchronouslyCallOnNextOnSubscriber();
    }

    @Test
    public void spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
        this.subscriberVerification.spec311_requestMaySynchronouslyCallOnCompleteOrOnError();
    }

    @Test
    public void spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
        this.subscriberVerification.spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists();
    }

    @Test
    public void spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
        this.subscriberVerification.spec315_cancelMustNotThrowExceptionAndMustSignalOnError();
    }

    @Test
    public void spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
        this.subscriberVerification.spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber();
    }

    @Test
    public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Throwable {
        this.optionalMultipleSubscribersTest(2L, new Function<Long, TestSetup>(){

            @Override
            public TestSetup apply(Long subscribers) throws Throwable {
                return new TestSetup(IdentityProcessorVerification.this.env, IdentityProcessorVerification.this.processorBufferSize){
                    {
                        TestEnvironment.ManualSubscriber sub1 = this.newSubscriber();
                        sub1.request(20L);
                        long totalRequests = this.expectRequest();
                        Object x = this.sendNextTFromUpstream();
                        this.expectNextElement(sub1, x);
                        if (totalRequests == 1L) {
                            totalRequests += this.expectRequest();
                        }
                        Object y = this.sendNextTFromUpstream();
                        this.expectNextElement(sub1, y);
                        if (totalRequests == 2L) {
                            totalRequests += this.expectRequest();
                        }
                        TestEnvironment.ManualSubscriber sub2 = this.newSubscriber();
                        Object z = this.sendNextTFromUpstream();
                        this.expectNextElement(sub1, z);
                        sub2.expectNone();
                        sub2.request(1L);
                        this.expectNextElement(sub2, z);
                        if (totalRequests == 3L) {
                            this.expectRequest();
                        }
                        this.sendCompletion();
                        sub1.expectCompletion(this.env.defaultTimeoutMillis());
                        sub2.expectCompletion(this.env.defaultTimeoutMillis());
                        this.env.verifyNoAsyncErrors();
                    }
                };
            }
        });
    }

    public void notVerified() {
        this.publisherVerification.notVerified();
    }

    public void notVerified(String message) {
        this.publisherVerification.notVerified(message);
    }

    public void optionalMultipleSubscribersTest(long requiredSubscribersSupport, Function<Long, TestSetup> body) throws Throwable {
        if (requiredSubscribersSupport > this.maxSupportedSubscribers()) {
            this.notVerified(String.format("The Publisher under test only supports %d subscribers, while this test requires at least %d to run.", this.maxSupportedSubscribers(), requiredSubscribersSupport));
        } else {
            body.apply(requiredSubscribersSupport);
        }
    }

    public class ManualSubscriberWithErrorCollection<A>
    extends TestEnvironment.ManualSubscriberWithSubscriptionSupport<A> {
        TestEnvironment.Promise<Throwable> error;

        public ManualSubscriberWithErrorCollection(TestEnvironment env) {
            super(env);
            this.error = new TestEnvironment.Promise(env);
        }

        @Override
        public void onError(Throwable cause) {
            this.error.complete(cause);
        }

        public void expectError(Throwable cause) throws InterruptedException {
            this.expectError(cause, this.env.defaultTimeoutMillis());
        }

        public void expectError(Throwable cause, long timeoutMillis) throws InterruptedException {
            this.error.expectCompletion(timeoutMillis, "Did not receive expected error on downstream");
            if (!cause.equals(this.error.value())) {
                this.env.flop(String.format("Expected error %s but got %s", cause, this.error.value()));
            }
        }
    }

    public abstract class TestSetup
    extends TestEnvironment.ManualPublisher<T> {
        private final TestEnvironment.ManualSubscriber<T> tees;
        private Set<T> seenTees;
        final Processor<T, T> processor;

        public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException {
            super(env);
            this.seenTees = new HashSet();
            this.tees = env.newManualSubscriber(IdentityProcessorVerification.this.createHelperPublisher(Long.MAX_VALUE));
            this.processor = IdentityProcessorVerification.this.createIdentityProcessor(testBufferSize);
            this.subscribe(this.processor);
        }

        public TestEnvironment.ManualSubscriber<T> newSubscriber() throws InterruptedException {
            return this.env.newManualSubscriber(this.processor);
        }

        public T nextT() throws InterruptedException {
            Object t = this.tees.requestNextElement();
            if (this.seenTees.contains(t)) {
                this.env.flop(String.format("Helper publisher illegally produced the same element %s twice", t));
            }
            this.seenTees.add(t);
            return t;
        }

        public void expectNextElement(TestEnvironment.ManualSubscriber<T> sub, T expected) throws InterruptedException {
            Object elem = sub.nextElement(String.format("timeout while awaiting %s", expected));
            if (!elem.equals(expected)) {
                this.env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected));
            }
        }

        public T sendNextTFromUpstream() throws InterruptedException {
            Object x = this.nextT();
            this.sendNext(x);
            return x;
        }
    }
}

