package io.rsocket.test;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.ResourceLeakDetector;
import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketErrorException;
import io.rsocket.core.RSocketConnector;
import io.rsocket.core.RSocketServer;
import io.rsocket.core.Resume;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.plugins.DuplexConnectionInterceptor;
import io.rsocket.resume.InMemoryResumableFramesStore;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.ServerTransport;
import io.rsocket.util.ByteBufPayload;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:io/rsocket/test/TransportTest.class */
public interface TransportTest {
    public static final String MOCK_DATA = "test-data";
    public static final String MOCK_METADATA = "metadata";
    public static final Logger logger = Loggers.getLogger(TransportTest.class);
    public static final String LARGE_DATA = read("words.shakespeare.txt.gz");
    public static final Payload LARGE_PAYLOAD = ByteBufPayload.create(LARGE_DATA, LARGE_DATA);

    /* loaded from: input_file:io/rsocket/test/TransportTest$TransportPair.class */
    public static class TransportPair<T, S extends Closeable> implements Disposable {
        private static final String data = "hello world";
        private static final String metadata = "metadata";
        private final boolean withResumability;
        private final LeaksTrackingByteBufAllocator byteBufAllocator1;
        private final LeaksTrackingByteBufAllocator byteBufAllocator2;
        private final TestRSocket responder;
        private final RSocket client;
        private final S server;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:io/rsocket/test/TransportTest$TransportPair$AsyncDuplexConnection.class */
        public static class AsyncDuplexConnection implements DuplexConnection {
            private final DuplexConnection duplexConnection;
            private final ByteBufReleaserOperator bufReleaserOperator = new ByteBufReleaserOperator();

            public AsyncDuplexConnection(DuplexConnection duplexConnection) {
                this.duplexConnection = duplexConnection;
            }

            public void sendFrame(int i, ByteBuf byteBuf) {
                this.duplexConnection.sendFrame(i, byteBuf);
            }

            public void sendErrorAndClose(RSocketErrorException rSocketErrorException) {
                this.duplexConnection.sendErrorAndClose(rSocketErrorException);
            }

            public Flux<ByteBuf> receive() {
                return this.duplexConnection.receive().subscribeOn(Schedulers.boundedElastic()).doOnNext((v0) -> {
                    v0.retain();
                }).publishOn(Schedulers.boundedElastic(), Integer.MAX_VALUE).doOnDiscard(ReferenceCounted.class, (v0) -> {
                    ReferenceCountUtil.safeRelease(v0);
                }).transform(Operators.lift((scannable, coreSubscriber) -> {
                    this.bufReleaserOperator.actual = coreSubscriber;
                    return this.bufReleaserOperator;
                }));
            }

            public ByteBufAllocator alloc() {
                return this.duplexConnection.alloc();
            }

            public SocketAddress remoteAddress() {
                return this.duplexConnection.remoteAddress();
            }

            public Mono<Void> onClose() {
                return this.duplexConnection.onClose().and(this.bufReleaserOperator.onClose());
            }

            public void dispose() {
                this.duplexConnection.dispose();
            }
        }

        /* loaded from: input_file:io/rsocket/test/TransportTest$TransportPair$ByteBufReleaserOperator.class */
        private static class ByteBufReleaserOperator implements CoreSubscriber<ByteBuf>, Subscription, Fuseable.QueueSubscription<ByteBuf> {
            CoreSubscriber<? super ByteBuf> actual;
            final MonoProcessor<Void> closeableMono = MonoProcessor.create();
            Subscription s;

            public void onSubscribe(Subscription subscription) {
                if (Operators.validate(this.s, subscription)) {
                    this.s = subscription;
                    this.actual.onSubscribe(this);
                }
            }

            public void onNext(ByteBuf byteBuf) {
                this.actual.onNext(byteBuf);
                byteBuf.release();
            }

            Mono<Void> onClose() {
                return this.closeableMono;
            }

            public void onError(Throwable th) {
                this.actual.onError(th);
                this.closeableMono.onError(th);
            }

            public void onComplete() {
                this.actual.onComplete();
                this.closeableMono.onComplete();
            }

            public void request(long j) {
                this.s.request(j);
            }

            public void cancel() {
                this.s.cancel();
                this.closeableMono.onComplete();
            }

            public int requestFusion(int i) {
                return 0;
            }

            /* renamed from: poll, reason: merged with bridge method [inline-methods] */
            public ByteBuf m4poll() {
                throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
            }

            public int size() {
                throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
            }

            public boolean isEmpty() {
                throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
            }

            public void clear() {
                throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:io/rsocket/test/TransportTest$TransportPair$DisconnectingDuplexConnection.class */
        public static class DisconnectingDuplexConnection implements DuplexConnection {
            private final String tag;
            final DuplexConnection source;
            final Duration delay;
            boolean receivedFirst;

            DisconnectingDuplexConnection(String str, DuplexConnection duplexConnection, Duration duration) {
                this.tag = str;
                this.source = duplexConnection;
                this.delay = duration;
            }

            public void dispose() {
                this.source.dispose();
            }

            public Mono<Void> onClose() {
                return this.source.onClose();
            }

            public void sendFrame(int i, ByteBuf byteBuf) {
                this.source.sendFrame(i, byteBuf);
            }

            public void sendErrorAndClose(RSocketErrorException rSocketErrorException) {
                this.source.sendErrorAndClose(rSocketErrorException);
            }

            public Flux<ByteBuf> receive() {
                return this.source.receive().doOnNext(byteBuf -> {
                    if (this.receivedFirst) {
                        return;
                    }
                    this.receivedFirst = true;
                    Mono.delay(this.delay).takeUntilOther(this.source.onClose()).subscribe(l -> {
                        TransportTest.logger.warn("Tag {}. Disposing Connection[{}]", new Object[]{this.tag, Integer.valueOf(this.source.hashCode())});
                        this.source.dispose();
                    });
                });
            }

            public ByteBufAllocator alloc() {
                return this.source.alloc();
            }

            public SocketAddress remoteAddress() {
                return this.source.remoteAddress();
            }
        }

        public TransportPair(Supplier<T> supplier, TriFunction<T, S, ByteBufAllocator, ClientTransport> triFunction, BiFunction<T, ByteBufAllocator, ServerTransport<S>> biFunction) {
            this(supplier, triFunction, biFunction, false);
        }

        public TransportPair(Supplier<T> supplier, TriFunction<T, S, ByteBufAllocator, ClientTransport> triFunction, BiFunction<T, ByteBufAllocator, ServerTransport<S>> biFunction, boolean z) {
            this(supplier, triFunction, biFunction, z, false);
        }

        public TransportPair(Supplier<T> supplier, TriFunction<T, S, ByteBufAllocator, ClientTransport> triFunction, BiFunction<T, ByteBufAllocator, ServerTransport<S>> biFunction, boolean z, boolean z2) {
            ByteBufAllocator byteBufAllocator;
            ByteBufAllocator byteBufAllocator2;
            this.byteBufAllocator1 = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT, Duration.ofMinutes(1L), "Client");
            this.byteBufAllocator2 = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT, Duration.ofMinutes(1L), "Server");
            this.withResumability = z2;
            T t = supplier.get();
            boolean nextBoolean = ThreadLocalRandom.current().nextBoolean();
            boolean nextBoolean2 = ThreadLocalRandom.current().nextBoolean();
            if (ResourceLeakDetector.getLevel() == ResourceLeakDetector.Level.ADVANCED || ResourceLeakDetector.getLevel() == ResourceLeakDetector.Level.PARANOID) {
                TransportTest.logger.info("Using LeakTrackingByteBufAllocator");
                byteBufAllocator = this.byteBufAllocator1;
                byteBufAllocator2 = this.byteBufAllocator2;
            } else {
                byteBufAllocator = ByteBufAllocator.DEFAULT;
                byteBufAllocator2 = ByteBufAllocator.DEFAULT;
            }
            this.responder = new TestRSocket(data, "metadata");
            RSocketServer interceptors = RSocketServer.create((connectionSetupPayload, rSocket) -> {
                return Mono.just(this.responder);
            }).payloadDecoder(PayloadDecoder.ZERO_COPY).interceptors(interceptorRegistry -> {
                if (nextBoolean2 && !z2) {
                    TransportTest.logger.info("Perform Integration Test with Async Interceptors Enabled For Server");
                    interceptorRegistry.forConnection((type, duplexConnection) -> {
                        return new AsyncDuplexConnection(duplexConnection);
                    }).forSocketAcceptor(socketAcceptor -> {
                        return (connectionSetupPayload2, rSocket2) -> {
                            return socketAcceptor.accept(connectionSetupPayload2, rSocket2).subscribeOn(Schedulers.parallel());
                        };
                    });
                }
                if (z2) {
                    interceptorRegistry.forConnection((type2, duplexConnection2) -> {
                        return type2 == DuplexConnectionInterceptor.Type.SOURCE ? new DisconnectingDuplexConnection("Server", duplexConnection2, Duration.ofMillis(ThreadLocalRandom.current().nextInt(100, 1000))) : duplexConnection2;
                    });
                }
            });
            if (z2) {
                interceptors.resume(new Resume().storeFactory(byteBuf -> {
                    return new InMemoryResumableFramesStore("server", byteBuf, Integer.MAX_VALUE);
                }));
            }
            if (z) {
                interceptors.fragment(ThreadLocalRandom.current().nextInt(256, 512));
            }
            this.server = (S) interceptors.bind(biFunction.apply(t, byteBufAllocator2)).block();
            RSocketConnector interceptors2 = RSocketConnector.create().payloadDecoder(PayloadDecoder.ZERO_COPY).keepAlive(Duration.ofMillis(10L), Duration.ofHours(1L)).interceptors(interceptorRegistry2 -> {
                if (nextBoolean && !z2) {
                    TransportTest.logger.info("Perform Integration Test with Async Interceptors Enabled For Client");
                    interceptorRegistry2.forConnection((type, duplexConnection) -> {
                        return new AsyncDuplexConnection(duplexConnection);
                    }).forSocketAcceptor(socketAcceptor -> {
                        return (connectionSetupPayload2, rSocket2) -> {
                            return socketAcceptor.accept(connectionSetupPayload2, rSocket2).subscribeOn(Schedulers.parallel());
                        };
                    });
                }
                if (z2) {
                    interceptorRegistry2.forConnection((type2, duplexConnection2) -> {
                        return type2 == DuplexConnectionInterceptor.Type.SOURCE ? new DisconnectingDuplexConnection("Client", duplexConnection2, Duration.ofMillis(ThreadLocalRandom.current().nextInt(10, 1500))) : duplexConnection2;
                    });
                }
            });
            if (z2) {
                interceptors2.resume(new Resume().storeFactory(byteBuf2 -> {
                    return new InMemoryResumableFramesStore("client", byteBuf2, Integer.MAX_VALUE);
                }));
            }
            if (z) {
                interceptors2.fragment(ThreadLocalRandom.current().nextInt(256, 512));
            }
            this.client = (RSocket) interceptors2.connect(triFunction.apply(t, this.server, byteBufAllocator)).doOnError((v0) -> {
                v0.printStackTrace();
            }).block();
        }

        public void dispose() {
            this.server.dispose();
            this.client.dispose();
        }

        RSocket getClient() {
            return this.client;
        }

        public String expectedPayloadData() {
            return data;
        }

        public String expectedPayloadMetadata() {
            return "metadata";
        }

        public void awaitClosed() {
            this.server.onClose().and(this.client.onClose()).block(Duration.ofMinutes(1L));
        }
    }

    static String read(String str) {
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new GZIPInputStream(TransportTest.class.getClassLoader().getResourceAsStream(str))));
            Throwable th = null;
            try {
                String str2 = (String) bufferedReader.lines().map((v0) -> {
                    return v0.toLowerCase();
                }).collect(Collectors.joining("\n\r"));
                if (bufferedReader != null) {
                    if (0 != 0) {
                        try {
                            bufferedReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        bufferedReader.close();
                    }
                }
                return str2;
            } finally {
            }
        } catch (Throwable th3) {
            throw new RuntimeException(th3);
        }
    }

    @AfterEach
    default void close() {
        getTransportPair().responder.awaitAllInteractionTermination(getTimeout());
        getTransportPair().dispose();
        getTransportPair().awaitClosed();
        RuntimeException runtimeException = new RuntimeException();
        try {
            getTransportPair().byteBufAllocator2.assertHasNoLeaks();
        } catch (Throwable th) {
            runtimeException = Exceptions.addSuppressed(runtimeException, th);
        }
        try {
            getTransportPair().byteBufAllocator1.assertHasNoLeaks();
        } catch (Throwable th2) {
            runtimeException = Exceptions.addSuppressed(runtimeException, th2);
        }
        if (runtimeException.getSuppressed().length > 0) {
            throw runtimeException;
        }
    }

    default Payload createTestPayload(int i) {
        String str;
        switch (i % 5) {
            case 0:
                str = null;
                break;
            case 1:
                str = "";
                break;
            default:
                str = MOCK_METADATA;
                break;
        }
        return ByteBufPayload.create(MOCK_DATA, str);
    }

    @DisplayName("makes 10 fireAndForget requests")
    @Test
    default void fireAndForget10() {
        ((StepVerifier.FirstStep) Flux.range(1, 10).flatMap(num -> {
            return getClient().fireAndForget(createTestPayload(num.intValue()));
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectComplete().verify(getTimeout());
        getTransportPair().responder.awaitUntilObserved(10, getTimeout());
    }

    @DisplayName("makes 10 fireAndForget with Large Payload in Requests")
    @Test
    default void largePayloadFireAndForget10() {
        ((StepVerifier.FirstStep) Flux.range(1, 10).flatMap(num -> {
            return getClient().fireAndForget(LARGE_PAYLOAD.retain());
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectComplete().verify(getTimeout());
        getTransportPair().responder.awaitUntilObserved(10, getTimeout());
    }

    default RSocket getClient() {
        return getTransportPair().getClient();
    }

    Duration getTimeout();

    TransportPair getTransportPair();

    @DisplayName("makes 10 metadataPush requests")
    @Test
    default void metadataPush10() {
        Assumptions.assumeThat(getTransportPair().withResumability).isFalse();
        ((StepVerifier.FirstStep) Flux.range(1, 10).flatMap(num -> {
            return getClient().metadataPush(ByteBufPayload.create("", "test-metadata"));
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectComplete().verify(getTimeout());
        getTransportPair().responder.awaitUntilObserved(10, getTimeout());
    }

    @DisplayName("makes 10 metadataPush with Large Metadata in requests")
    @Test
    default void largePayloadMetadataPush10() {
        Assumptions.assumeThat(getTransportPair().withResumability).isFalse();
        ((StepVerifier.FirstStep) Flux.range(1, 10).flatMap(num -> {
            return getClient().metadataPush(ByteBufPayload.create("", LARGE_DATA));
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectComplete().verify(getTimeout());
        getTransportPair().responder.awaitUntilObserved(10, getTimeout());
    }

    @DisplayName("makes 1 requestChannel request with 0 payloads")
    @Test
    default void requestChannel0() {
        ((StepVerifier.FirstStep) getClient().requestChannel(Flux.empty()).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectErrorSatisfies(th -> {
            Assertions.assertThat(th).isInstanceOf(CancellationException.class).hasMessage("Empty Source");
        }).verify(getTimeout());
    }

    @DisplayName("makes 1 requestChannel request with 1 payloads")
    @Test
    default void requestChannel1() {
        ((StepVerifier.FirstStep) getClient().requestChannel(Mono.just(createTestPayload(0))).doOnNext((v0) -> {
            v0.release();
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(1L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestChannel request with 200,000 payloads")
    @Test
    default void requestChannel200_000() {
        ((StepVerifier.FirstStep) getClient().requestChannel(Flux.range(0, 200000).map((v1) -> {
            return createTestPayload(v1);
        })).doOnNext((v0) -> {
            v0.release();
        }).limitRate(8).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(200000L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestChannel request with 50 large payloads")
    @Test
    default void largePayloadRequestChannel50() {
        ((StepVerifier.FirstStep) getClient().requestChannel(Flux.range(0, 50).map(num -> {
            return LARGE_PAYLOAD.retain();
        })).doOnNext((v0) -> {
            v0.release();
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(50L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestChannel request with 20,000 payloads")
    @Test
    default void requestChannel20_000() {
        ((StepVerifier.FirstStep) getClient().requestChannel(Flux.range(0, 20000).map(num -> {
            return createTestPayload(7);
        })).doOnNext(this::assertChannelPayload).doOnNext((v0) -> {
            v0.release();
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(20000L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestChannel request with 2,000,000 payloads")
    @SlowTest
    default void requestChannel2_000_000() {
        ((StepVerifier.FirstStep) getClient().requestChannel(Flux.range(0, 2000000).map((v1) -> {
            return createTestPayload(v1);
        })).doOnNext((v0) -> {
            v0.release();
        }).limitRate(8).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(2000000L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestChannel request with 3 payloads")
    @Test
    default void requestChannel3() {
        AtomicLong atomicLong = new AtomicLong();
        Flux range = Flux.range(0, 3);
        atomicLong.getClass();
        ((StepVerifier.FirstStep) getClient().requestChannel(range.doOnRequest(atomicLong::addAndGet).map((v1) -> {
            return createTestPayload(v1);
        })).doOnNext((v0) -> {
            v0.release();
        }).as(flux -> {
            return StepVerifier.create(flux, 3L);
        })).expectNextCount(3L).expectComplete().verify(getTimeout());
        Assertions.assertThat(atomicLong.get()).isEqualTo(3L);
    }

    @DisplayName("makes 1 requestChannel request with 256 payloads")
    @Test
    default void requestChannel256() {
        AtomicInteger atomicInteger = new AtomicInteger();
        Flux defer = Flux.defer(() -> {
            int andIncrement = atomicInteger.getAndIncrement();
            return Flux.range(0, 256).map(num -> {
                return "S{" + andIncrement + "}: Data{" + num + "}";
            }).map(str -> {
                return ByteBufPayload.create(str);
            });
        });
        Scheduler fromExecutorService = Schedulers.fromExecutorService(Executors.newFixedThreadPool(12));
        Flux.range(0, 1024).flatMap(num -> {
            return Mono.fromRunnable(() -> {
                check(defer);
            }).subscribeOn(fromExecutorService);
        }, 12).blockLast();
    }

    default void check(Flux<Payload> flux) {
        ((StepVerifier.FirstStep) getClient().requestChannel(flux).doOnNext((v0) -> {
            v0.release();
        }).limitRate(8).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(256L).as("expected 256 items").expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestResponse request")
    @Test
    default void requestResponse1() {
        ((StepVerifier.FirstStep) getClient().requestResponse(createTestPayload(1)).doOnNext(this::assertPayload).doOnNext((v0) -> {
            v0.release();
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(1L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 10 requestResponse requests")
    @Test
    default void requestResponse10() {
        ((StepVerifier.FirstStep) Flux.range(1, 10).flatMap(num -> {
            return getClient().requestResponse(createTestPayload(num.intValue())).doOnNext(payload -> {
                assertPayload(payload);
            }).doOnNext((v0) -> {
                v0.release();
            });
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(10L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 100 requestResponse requests")
    @Test
    default void requestResponse100() {
        ((StepVerifier.FirstStep) Flux.range(1, 100).flatMap(num -> {
            return getClient().requestResponse(createTestPayload(num.intValue())).doOnNext((v0) -> {
                v0.release();
            });
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(100L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 50 requestResponse requests")
    @Test
    default void largePayloadRequestResponse50() {
        ((StepVerifier.FirstStep) Flux.range(1, 50).flatMap(num -> {
            return getClient().requestResponse(LARGE_PAYLOAD.retain()).doOnNext((v0) -> {
                v0.release();
            });
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(50L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 10,000 requestResponse requests")
    @Test
    default void requestResponse10_000() {
        ((StepVerifier.FirstStep) Flux.range(1, 10000).flatMap(num -> {
            return getClient().requestResponse(createTestPayload(num.intValue())).doOnNext((v0) -> {
                v0.release();
            });
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(10000L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestStream request and receives 10,000 responses")
    @Test
    default void requestStream10_000() {
        ((StepVerifier.FirstStep) getClient().requestStream(createTestPayload(3)).doOnNext(this::assertPayload).doOnNext((v0) -> {
            v0.release();
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(10000L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestStream request and receives 5 responses")
    @Test
    default void requestStream5() {
        ((StepVerifier.FirstStep) getClient().requestStream(createTestPayload(3)).doOnNext(this::assertPayload).doOnNext((v0) -> {
            v0.release();
        }).take(5L).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(5L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestStream request and consumes result incrementally")
    @Test
    default void requestStreamDelayedRequestN() {
        ((StepVerifier.FirstStep) getClient().requestStream(createTestPayload(3)).take(10L).doOnNext((v0) -> {
            v0.release();
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).thenRequest(5L).expectNextCount(5L).thenRequest(5L).expectNextCount(5L).expectComplete().verify(getTimeout());
    }

    default void assertPayload(Payload payload) {
        TransportPair transportPair = getTransportPair();
        if (!transportPair.expectedPayloadData().equals(payload.getDataUtf8()) || !transportPair.expectedPayloadMetadata().equals(payload.getMetadataUtf8())) {
            throw new IllegalStateException("Unexpected payload");
        }
    }

    default void assertChannelPayload(Payload payload) {
        if (!MOCK_DATA.equals(payload.getDataUtf8()) || !MOCK_METADATA.equals(payload.getMetadataUtf8())) {
            throw new IllegalStateException("Unexpected payload");
        }
    }
}
