/*
 * Decompiled with CFR 0.152.
 */
package org.reactivestreams.tck;

import java.util.HashSet;
import java.util.Set;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.PublisherVerification;
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.reactivestreams.tck.support.Function;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public abstract class IdentityProcessorVerification<T> {
    private final TestEnvironment env;
    private final SubscriberWhiteboxVerification<T> subscriberVerification;
    private final PublisherVerification<T> publisherVerification;
    private final int processorBufferSize;

    public IdentityProcessorVerification(TestEnvironment testEnvironment, long l) {
        this(testEnvironment, l, 16);
    }

    public IdentityProcessorVerification(TestEnvironment testEnvironment, long l, int n) {
        this.env = testEnvironment;
        this.processorBufferSize = n;
        this.subscriberVerification = new SubscriberWhiteboxVerification<T>(testEnvironment){

            @Override
            public Subscriber<T> createSubscriber(SubscriberWhiteboxVerification.WhiteboxSubscriberProbe<T> whiteboxSubscriberProbe) {
                return IdentityProcessorVerification.this.createSubscriber(whiteboxSubscriberProbe);
            }

            @Override
            public Publisher<T> createHelperPublisher(long l) {
                return IdentityProcessorVerification.this.createHelperPublisher(l);
            }
        };
        this.publisherVerification = new PublisherVerification<T>(testEnvironment, l){

            @Override
            public Publisher<T> createPublisher(long l) {
                return IdentityProcessorVerification.this.createPublisher(l);
            }

            @Override
            public Publisher<T> createErrorStatePublisher() {
                return IdentityProcessorVerification.this.createErrorStatePublisher();
            }

            @Override
            public long maxElementsFromPublisher() {
                return IdentityProcessorVerification.this.maxElementsFromPublisher();
            }

            @Override
            public long boundedDepthOfOnNextAndRequestRecursion() {
                return IdentityProcessorVerification.this.boundedDepthOfOnNextAndRequestRecursion();
            }

            @Override
            public boolean skipStochasticTests() {
                return IdentityProcessorVerification.this.skipStochasticTests();
            }
        };
    }

    public abstract Processor<T, T> createIdentityProcessor(int var1);

    public abstract Publisher<T> createHelperPublisher(long var1);

    public abstract Publisher<T> createErrorStatePublisher();

    public long maxElementsFromPublisher() {
        return Long.MAX_VALUE;
    }

    public long boundedDepthOfOnNextAndRequestRecursion() {
        return 1L;
    }

    public boolean skipStochasticTests() {
        return false;
    }

    public long maxSupportedSubscribers() {
        return Long.MAX_VALUE;
    }

    @BeforeMethod
    public void setUp() throws Exception {
        this.publisherVerification.setUp();
        this.subscriberVerification.setUp();
    }

    public Publisher<T> createPublisher(long l) {
        Processor<T, T> processor = this.createIdentityProcessor(this.processorBufferSize);
        Publisher<T> publisher = this.createHelperPublisher(l);
        publisher.subscribe(processor);
        return processor;
    }

    @Test
    public void validate_maxElementsFromPublisher() throws Exception {
        this.publisherVerification.validate_maxElementsFromPublisher();
    }

    @Test
    public void validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception {
        this.publisherVerification.validate_boundedDepthOfOnNextAndRequestRecursion();
    }

    @Test
    public void createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable {
        this.publisherVerification.createPublisher1MustProduceAStreamOfExactly1Element();
    }

    @Test
    public void createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable {
        this.publisherVerification.createPublisher3MustProduceAStreamOfExactly3Elements();
    }

    @Test
    public void spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable {
        this.publisherVerification.spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements();
    }

    @Test
    public void spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable {
        this.publisherVerification.spec102_maySignalLessThanRequestedAndTerminateSubscription();
    }

    @Test
    public void spec103_mustSignalOnMethodsSequentially() throws Throwable {
        this.publisherVerification.spec103_mustSignalOnMethodsSequentially();
    }

    @Test
    public void spec104_mustSignalOnErrorWhenFails() throws Throwable {
        this.publisherVerification.spec104_mustSignalOnErrorWhenFails();
    }

    @Test
    public void spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable {
        this.publisherVerification.spec105_mustSignalOnCompleteWhenFiniteStreamTerminates();
    }

    @Test
    public void spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable {
        this.publisherVerification.spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled();
    }

    @Test
    public void spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable {
        this.publisherVerification.spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled();
    }

    @Test
    public void spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable {
        this.publisherVerification.spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled();
    }

    @Test
    public void spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable {
        this.publisherVerification.spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals();
    }

    @Test
    public void spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
        this.publisherVerification.spec109_subscribeShouldNotThrowNonFatalThrowable();
    }

    @Test
    public void spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
        this.publisherVerification.spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice();
    }

    @Test
    public void spec111_maySupportMultiSubscribe() throws Throwable {
        this.publisherVerification.spec111_maySupportMultiSubscribe();
    }

    @Test
    public void spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
        this.publisherVerification.spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe();
    }

    @Test
    public void spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
        this.publisherVerification.spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne();
    }

    @Test
    public void spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
        this.publisherVerification.spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
    }

    @Test
    public void spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable {
        this.publisherVerification.spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe();
    }

    @Test
    public void spec303_mustNotAllowUnboundedRecursion() throws Throwable {
        this.publisherVerification.spec303_mustNotAllowUnboundedRecursion();
    }

    @Test
    public void spec304_requestShouldNotPerformHeavyComputations() throws Exception {
        this.publisherVerification.spec304_requestShouldNotPerformHeavyComputations();
    }

    @Test
    public void spec305_cancelMustNotSynchronouslyPerformHeavyCompuatation() throws Exception {
        this.publisherVerification.spec305_cancelMustNotSynchronouslyPerformHeavyCompuatation();
    }

    @Test
    public void spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable {
        this.publisherVerification.spec306_afterSubscriptionIsCancelledRequestMustBeNops();
    }

    @Test
    public void spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable {
        this.publisherVerification.spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops();
    }

    @Test
    public void spec309_requestZeroMustThrowIllegalArgumentException() throws Throwable {
        this.publisherVerification.spec309_requestZeroMustThrowIllegalArgumentException();
    }

    @Test
    public void spec309_requestNegativeNumberMustThrowIllegalArgumentException() throws Throwable {
        this.publisherVerification.spec309_requestNegativeNumberMustThrowIllegalArgumentException();
    }

    @Test
    public void spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
        this.publisherVerification.spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber();
    }

    @Test
    public void spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable {
        this.publisherVerification.spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue();
    }

    @Test
    public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable {
        this.optionalMultipleSubscribersTest(2L, new Function<Long, TestSetup>(){

            @Override
            public TestSetup apply(Long l) throws Throwable {
                return new TestSetup(IdentityProcessorVerification.this.env, IdentityProcessorVerification.this.processorBufferSize){
                    {
                        ManualSubscriberWithErrorCollection manualSubscriberWithErrorCollection = new ManualSubscriberWithErrorCollection(this.env);
                        this.env.subscribe(this.processor, manualSubscriberWithErrorCollection);
                        ManualSubscriberWithErrorCollection manualSubscriberWithErrorCollection2 = new ManualSubscriberWithErrorCollection(this.env);
                        this.env.subscribe(this.processor, manualSubscriberWithErrorCollection2);
                        manualSubscriberWithErrorCollection.request(1L);
                        this.expectRequest();
                        Object t = this.sendNextTFromUpstream();
                        this.expectNextElement(manualSubscriberWithErrorCollection, t);
                        manualSubscriberWithErrorCollection.request(1L);
                        RuntimeException runtimeException = new RuntimeException("Test exception");
                        this.sendError(runtimeException);
                        manualSubscriberWithErrorCollection.expectError(runtimeException);
                        manualSubscriberWithErrorCollection2.expectError(runtimeException);
                        this.env.verifyNoAsyncErrors();
                    }
                };
            }
        });
    }

    public Subscriber<T> createSubscriber(final SubscriberWhiteboxVerification.WhiteboxSubscriberProbe<T> whiteboxSubscriberProbe) {
        Processor<T, T> processor = this.createIdentityProcessor(this.processorBufferSize);
        processor.subscribe(new Subscriber<T>(){
            private final TestEnvironment.Promise<Subscription> subs;
            {
                this.subs = new TestEnvironment.Promise(IdentityProcessorVerification.this.env);
            }

            public void onSubscribe(final Subscription subscription) {
                IdentityProcessorVerification.this.env.debug("whiteboxSubscriber::onSubscribe(" + subscription + ")");
                if (this.subs.isCompleted()) {
                    subscription.cancel();
                }
                whiteboxSubscriberProbe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet(){

                    public void triggerShutdown() {
                        subscription.cancel();
                    }

                    @Override
                    public void triggerRequest(long l) {
                        subscription.request(l);
                    }

                    @Override
                    public void signalCancel() {
                        subscription.cancel();
                    }
                });
            }

            public void onNext(T t) {
                IdentityProcessorVerification.this.env.debug("whiteboxSubscriber::onNext(" + t + ")");
                whiteboxSubscriberProbe.registerOnNext(t);
            }

            public void onComplete() {
                IdentityProcessorVerification.this.env.debug("whiteboxSubscriber::onComplete()");
                whiteboxSubscriberProbe.registerOnComplete();
            }

            public void onError(Throwable throwable) {
                IdentityProcessorVerification.this.env.debug("whiteboxSubscriber::onError(" + throwable + ")");
                whiteboxSubscriberProbe.registerOnError(throwable);
            }
        });
        return processor;
    }

    @Test
    public void mustCancelItsUpstreamSubscriptionIfItsLastDownstreamSubscriptionHasBeenCancelled() throws Exception {
        new TestSetup(this.env, this.processorBufferSize){
            {
                TestEnvironment.ManualSubscriber manualSubscriber = this.newSubscriber();
                manualSubscriber.cancel();
                this.expectCancelling();
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    @Test
    public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception {
        new TestSetup(this.env, this.processorBufferSize){
            {
                ManualSubscriberWithErrorCollection manualSubscriberWithErrorCollection = new ManualSubscriberWithErrorCollection(this.env);
                this.env.subscribe(this.processor, manualSubscriberWithErrorCollection);
                RuntimeException runtimeException = new RuntimeException("Test exception");
                this.sendError(runtimeException);
                manualSubscriberWithErrorCollection.expectError(runtimeException);
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    @Test
    public void exerciseWhiteboxHappyPath() throws Throwable {
        this.subscriberVerification.exerciseWhiteboxHappyPath();
    }

    @Test
    public void spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
        this.subscriberVerification.spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue();
    }

    @Test
    public void spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable {
        this.subscriberVerification.spec201_mustSignalDemandViaSubscriptionRequest();
    }

    @Test
    public void spec202_shouldAsynchronouslyDispatch() throws Exception {
        this.subscriberVerification.spec202_shouldAsynchronouslyDispatch();
    }

    @Test
    public void spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
        this.subscriberVerification.spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete();
    }

    @Test
    public void spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
        this.subscriberVerification.spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError();
    }

    @Test
    public void spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
        this.subscriberVerification.spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError();
    }

    @Test
    public void spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception {
        this.subscriberVerification.spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal();
    }

    @Test
    public void spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
        this.subscriberVerification.spec206_mustCallSubscriptionCancelIfItIsNoLongerValid();
    }

    @Test
    public void spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThread() throws Exception {
        this.subscriberVerification.spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThread();
    }

    @Test
    public void spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
        this.subscriberVerification.spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel();
    }

    @Test
    public void spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
        this.subscriberVerification.spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
    }

    @Test
    public void spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
        this.subscriberVerification.spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall();
    }

    @Test
    public void spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
        this.subscriberVerification.spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall();
    }

    @Test
    public void spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
        this.subscriberVerification.spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall();
    }

    @Test
    public void spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
        this.subscriberVerification.spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents();
    }

    @Test
    public void spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality() throws Throwable {
        this.subscriberVerification.spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality();
    }

    @Test
    public void spec213_failingOnCompleteInvocation() throws Exception {
        this.subscriberVerification.spec213_failingOnCompleteInvocation();
    }

    @Test
    public void spec214_failingOnErrorInvocation() throws Exception {
        this.subscriberVerification.spec214_failingOnErrorInvocation();
    }

    @Test
    public void spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception {
        this.subscriberVerification.spec301_mustNotBeCalledOutsideSubscriberContext();
    }

    @Test
    public void spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
        this.subscriberVerification.spec308_requestMustRegisterGivenNumberElementsToBeProduced();
    }

    @Test
    public void spec309_callingRequestZeroMustThrow() throws Throwable {
        this.subscriberVerification.spec309_callingRequestZeroMustThrow();
    }

    @Test
    public void spec309_callingRequestWithNegativeNumberMustThrow() throws Throwable {
        this.subscriberVerification.spec309_callingRequestWithNegativeNumberMustThrow();
    }

    @Test
    public void spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
        this.subscriberVerification.spec310_requestMaySynchronouslyCallOnNextOnSubscriber();
    }

    @Test
    public void spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
        this.subscriberVerification.spec311_requestMaySynchronouslyCallOnCompleteOrOnError();
    }

    @Test
    public void spec312_cancelMustRequestThePublisherToEventuallyStopSignaling() throws Throwable {
        this.subscriberVerification.spec312_cancelMustRequestThePublisherToEventuallyStopSignaling();
    }

    @Test
    public void spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
        this.subscriberVerification.spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists();
    }

    @Test
    public void spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
        this.subscriberVerification.spec315_cancelMustNotThrowExceptionAndMustSignalOnError();
    }

    @Test
    public void spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
        this.subscriberVerification.spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber();
    }

    @Test
    public void spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable {
        this.subscriberVerification.spec317_mustSupportAPendingElementCountUpToLongMaxValue();
    }

    @Test
    public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Throwable {
        this.optionalMultipleSubscribersTest(2L, new Function<Long, TestSetup>(){

            @Override
            public TestSetup apply(Long l) throws Throwable {
                return new TestSetup(IdentityProcessorVerification.this.env, IdentityProcessorVerification.this.processorBufferSize){
                    {
                        TestEnvironment.ManualSubscriber manualSubscriber = this.newSubscriber();
                        manualSubscriber.request(20L);
                        long l = this.expectRequest();
                        Object t = this.sendNextTFromUpstream();
                        this.expectNextElement(manualSubscriber, t);
                        if (l == 1L) {
                            l += this.expectRequest();
                        }
                        Object t2 = this.sendNextTFromUpstream();
                        this.expectNextElement(manualSubscriber, t2);
                        if (l == 2L) {
                            l += this.expectRequest();
                        }
                        TestEnvironment.ManualSubscriber manualSubscriber2 = this.newSubscriber();
                        Object t3 = this.sendNextTFromUpstream();
                        this.expectNextElement(manualSubscriber, t3);
                        manualSubscriber2.expectNone();
                        manualSubscriber2.request(1L);
                        this.expectNextElement(manualSubscriber2, t3);
                        if (l == 3L) {
                            this.expectRequest();
                        }
                        this.sendCompletion();
                        manualSubscriber.expectCompletion(this.env.defaultTimeoutMillis());
                        manualSubscriber2.expectCompletion(this.env.defaultTimeoutMillis());
                        this.env.verifyNoAsyncErrors();
                    }
                };
            }
        });
    }

    public void notVerified() {
        this.publisherVerification.notVerified();
    }

    public void notVerified(String string) {
        this.publisherVerification.notVerified(string);
    }

    public void optionalMultipleSubscribersTest(long l, Function<Long, TestSetup> function) throws Throwable {
        if (l > this.maxSupportedSubscribers()) {
            this.notVerified("The Publisher under test only supports " + this.maxSupportedSubscribers() + " subscribers, " + "while this test requires at least " + l + "to run.");
        } else {
            function.apply(l);
        }
    }

    public class ManualSubscriberWithErrorCollection<A>
    extends TestEnvironment.ManualSubscriberWithSubscriptionSupport<A> {
        TestEnvironment.Promise<Throwable> error;

        public ManualSubscriberWithErrorCollection(TestEnvironment testEnvironment) {
            super(testEnvironment);
            this.error = new TestEnvironment.Promise(testEnvironment);
        }

        @Override
        public void onError(Throwable throwable) {
            this.error.complete(throwable);
        }

        public void expectError(Throwable throwable) throws InterruptedException {
            this.expectError(throwable, this.env.defaultTimeoutMillis());
        }

        public void expectError(Throwable throwable, long l) throws InterruptedException {
            this.error.expectCompletion(l, "Did not receive expected error on downstream");
            if (!this.error.value().equals(throwable)) {
                this.env.flop("Expected error " + throwable + " but got " + this.error.value());
            }
        }
    }

    public abstract class TestSetup
    extends TestEnvironment.ManualPublisher<T> {
        private final TestEnvironment.ManualSubscriber<T> tees;
        private Set<T> seenTees;
        final Processor<T, T> processor;

        public TestSetup(TestEnvironment testEnvironment, int n) throws InterruptedException {
            super(testEnvironment);
            this.seenTees = new HashSet();
            this.tees = testEnvironment.newManualSubscriber(IdentityProcessorVerification.this.createHelperPublisher(Long.MAX_VALUE));
            this.processor = IdentityProcessorVerification.this.createIdentityProcessor(n);
            this.subscribe(this.processor);
        }

        public TestEnvironment.ManualSubscriber<T> newSubscriber() throws InterruptedException {
            return this.env.newManualSubscriber(this.processor);
        }

        public T nextT() throws InterruptedException {
            Object t = this.tees.requestNextElement();
            if (this.seenTees.contains(t)) {
                this.env.flop("Helper publisher illegally produced the same element " + t + " twice");
            }
            this.seenTees.add(t);
            return t;
        }

        public void expectNextElement(TestEnvironment.ManualSubscriber<T> manualSubscriber, T t) throws InterruptedException {
            Object t2 = manualSubscriber.nextElement("timeout while awaiting " + t);
            if (!t2.equals(t)) {
                this.env.flop("Received `onNext(" + t2 + ")` on downstream but expected `onNext(" + t + ")`");
            }
        }

        public T sendNextTFromUpstream() throws InterruptedException {
            Object t = this.nextT();
            this.sendNext(t);
            return t;
        }
    }
}

