/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.test;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.ResourceLeakDetector;
import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.test.LeaksTrackingByteBufAllocator;
import io.rsocket.test.SlowTest;
import io.rsocket.test.TestRSocket;
import io.rsocket.test.TriFunction;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.ServerTransport;
import io.rsocket.util.ByteBufPayload;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.platform.commons.logging.Logger;
import org.junit.platform.commons.logging.LoggerFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

public interface TransportTest {
    public static final Logger logger = LoggerFactory.getLogger(TransportTest.class);
    public static final String MOCK_DATA = "test-data";
    public static final String MOCK_METADATA = "metadata";
    public static final String LARGE_DATA = TransportTest.read("words.shakespeare.txt.gz");
    public static final Payload LARGE_PAYLOAD = ByteBufPayload.create((String)LARGE_DATA, (String)LARGE_DATA);

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public static String read(String resourceName) {
        try (BufferedReader br = new BufferedReader(new InputStreamReader(new GZIPInputStream(TransportTest.class.getClassLoader().getResourceAsStream(resourceName))));){
            String string = br.lines().map(String::toLowerCase).collect(Collectors.joining("\n\r"));
            return string;
        }
        catch (Throwable e) {
            throw new RuntimeException(e);
        }
    }

    @BeforeEach
    default public void setUp() {
        Hooks.onOperatorDebug();
    }

    @AfterEach
    default public void close() {
        this.getTransportPair().responder.awaitAllInteractionTermination(this.getTimeout());
        this.getTransportPair().dispose();
        this.getTransportPair().byteBufAllocator.assertHasNoLeaks();
        Hooks.resetOnOperatorDebug();
    }

    default public Payload createTestPayload(int metadataPresent) {
        String metadata1;
        switch (metadataPresent % 5) {
            case 0: {
                metadata1 = null;
                break;
            }
            case 1: {
                metadata1 = "";
                break;
            }
            default: {
                metadata1 = MOCK_METADATA;
            }
        }
        String metadata = metadata1;
        return ByteBufPayload.create((String)MOCK_DATA, (String)metadata);
    }

    @DisplayName(value="makes 10 fireAndForget requests")
    @Test
    default public void fireAndForget10() {
        ((StepVerifier.FirstStep)Flux.range((int)1, (int)10).flatMap(i -> this.getClient().fireAndForget(this.createTestPayload((int)i))).as(StepVerifier::create)).expectComplete().verify(this.getTimeout());
        this.getTransportPair().responder.awaitUntilObserved(10, this.getTimeout());
    }

    @DisplayName(value="makes 10 fireAndForget with Large Payload in Requests")
    @Test
    default public void largePayloadFireAndForget10() {
        ((StepVerifier.FirstStep)Flux.range((int)1, (int)10).flatMap(i -> this.getClient().fireAndForget(LARGE_PAYLOAD.retain())).as(StepVerifier::create)).expectComplete().verify(this.getTimeout());
        this.getTransportPair().responder.awaitUntilObserved(10, this.getTimeout());
    }

    default public RSocket getClient() {
        return this.getTransportPair().getClient();
    }

    public Duration getTimeout();

    public TransportPair getTransportPair();

    @DisplayName(value="makes 10 metadataPush requests")
    @Test
    default public void metadataPush10() {
        ((StepVerifier.FirstStep)Flux.range((int)1, (int)10).flatMap(i -> this.getClient().metadataPush(ByteBufPayload.create((String)"", (String)"test-metadata"))).as(StepVerifier::create)).expectComplete().verify(this.getTimeout());
        this.getTransportPair().responder.awaitUntilObserved(10, this.getTimeout());
    }

    @DisplayName(value="makes 10 metadataPush with Large Metadata in requests")
    @Test
    default public void largePayloadMetadataPush10() {
        ((StepVerifier.FirstStep)Flux.range((int)1, (int)10).flatMap(i -> this.getClient().metadataPush(ByteBufPayload.create((String)"", (String)LARGE_DATA))).as(StepVerifier::create)).expectComplete().verify(this.getTimeout());
        this.getTransportPair().responder.awaitUntilObserved(10, this.getTimeout());
    }

    @DisplayName(value="makes 1 requestChannel request with 0 payloads")
    @Test
    default public void requestChannel0() {
        ((StepVerifier.FirstStep)this.getClient().requestChannel((Publisher)Flux.empty()).as(StepVerifier::create)).expectErrorSatisfies(t -> ((AbstractThrowableAssert)Assertions.assertThat((Throwable)t).isInstanceOf(CancellationException.class)).hasMessage("Empty Source")).verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestChannel request with 1 payloads")
    @Test
    default public void requestChannel1() {
        ((StepVerifier.FirstStep)this.getClient().requestChannel((Publisher)Mono.just((Object)this.createTestPayload(0))).doOnNext(ReferenceCounted::release).as(StepVerifier::create)).expectNextCount(1L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestChannel request with 200,000 payloads")
    @Test
    default public void requestChannel200_000() {
        Flux payloads = Flux.range((int)0, (int)200000).map(this::createTestPayload);
        ((StepVerifier.FirstStep)this.getClient().requestChannel((Publisher)payloads).doOnNext(ReferenceCounted::release).as(StepVerifier::create)).expectNextCount(200000L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestChannel request with 50 large payloads")
    @Test
    default public void largePayloadRequestChannel50() {
        Flux payloads = Flux.range((int)0, (int)50).map(__ -> LARGE_PAYLOAD.retain());
        ((StepVerifier.FirstStep)this.getClient().requestChannel((Publisher)payloads).doOnNext(ReferenceCounted::release).as(StepVerifier::create)).expectNextCount(50L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestChannel request with 20,000 payloads")
    @Test
    default public void requestChannel20_000() {
        Flux payloads = Flux.range((int)0, (int)20000).map(metadataPresent -> this.createTestPayload(7));
        ((StepVerifier.FirstStep)this.getClient().requestChannel((Publisher)payloads).doOnNext(this::assertChannelPayload).doOnNext(ReferenceCounted::release).as(StepVerifier::create)).expectNextCount(20000L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestChannel request with 2,000,000 payloads")
    @SlowTest
    default public void requestChannel2_000_000() {
        Flux payloads = Flux.range((int)0, (int)2000000).map(this::createTestPayload);
        ((StepVerifier.FirstStep)this.getClient().requestChannel((Publisher)payloads).doOnNext(ReferenceCounted::release).as(StepVerifier::create)).expectNextCount(2000000L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestChannel request with 3 payloads")
    @Test
    default public void requestChannel3() {
        AtomicLong requested = new AtomicLong();
        Flux payloads = Flux.range((int)0, (int)3).doOnRequest(requested::addAndGet).map(this::createTestPayload);
        ((StepVerifier.FirstStep)this.getClient().requestChannel((Publisher)payloads).doOnNext(ReferenceCounted::release).as(publisher -> StepVerifier.create((Publisher)publisher, (long)3L))).expectNextCount(3L).expectComplete().verify(this.getTimeout());
        Assertions.assertThat((long)requested.get()).isEqualTo(3L);
    }

    @DisplayName(value="makes 1 requestChannel request with 512 payloads")
    @Test
    default public void requestChannel512() {
        Flux payloads = Flux.range((int)0, (int)512).map(this::createTestPayload);
        Scheduler scheduler = Schedulers.fromExecutorService((ExecutorService)Executors.newFixedThreadPool(13));
        Flux.range((int)0, (int)1024).flatMap(v -> Mono.fromRunnable(() -> this.check((Flux<Payload>)payloads)).subscribeOn(scheduler), 12).blockLast();
    }

    default public void check(Flux<Payload> payloads) {
        ((StepVerifier.FirstStep)this.getClient().requestChannel(payloads).doOnNext(ReferenceCounted::release).as(StepVerifier::create)).expectNextCount(512L).as("expected 512 items").expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestResponse request")
    @Test
    default public void requestResponse1() {
        ((StepVerifier.FirstStep)this.getClient().requestResponse(this.createTestPayload(1)).doOnNext(this::assertPayload).doOnNext(ReferenceCounted::release).as(StepVerifier::create)).expectNextCount(1L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 10 requestResponse requests")
    @Test
    default public void requestResponse10() {
        ((StepVerifier.FirstStep)Flux.range((int)1, (int)10).flatMap(i -> this.getClient().requestResponse(this.createTestPayload((int)i)).doOnNext(v -> this.assertPayload((Payload)v)).doOnNext(ReferenceCounted::release)).as(StepVerifier::create)).expectNextCount(10L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 100 requestResponse requests")
    @Test
    default public void requestResponse100() {
        ((StepVerifier.FirstStep)Flux.range((int)1, (int)100).flatMap(i -> this.getClient().requestResponse(this.createTestPayload((int)i)).doOnNext(ReferenceCounted::release)).as(StepVerifier::create)).expectNextCount(100L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 50 requestResponse requests")
    @Test
    default public void largePayloadRequestResponse50() {
        ((StepVerifier.FirstStep)Flux.range((int)1, (int)50).flatMap(i -> this.getClient().requestResponse(LARGE_PAYLOAD.retain()).doOnNext(ReferenceCounted::release)).as(StepVerifier::create)).expectNextCount(50L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 10,000 requestResponse requests")
    @Test
    default public void requestResponse10_000() {
        ((StepVerifier.FirstStep)Flux.range((int)1, (int)10000).flatMap(i -> this.getClient().requestResponse(this.createTestPayload((int)i)).doOnNext(ReferenceCounted::release)).as(StepVerifier::create)).expectNextCount(10000L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestStream request and receives 10,000 responses")
    @Test
    default public void requestStream10_000() {
        ((StepVerifier.FirstStep)this.getClient().requestStream(this.createTestPayload(3)).doOnNext(this::assertPayload).doOnNext(ReferenceCounted::release).as(StepVerifier::create)).expectNextCount(10000L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestStream request and receives 5 responses")
    @Test
    default public void requestStream5() {
        ((StepVerifier.FirstStep)this.getClient().requestStream(this.createTestPayload(3)).doOnNext(this::assertPayload).doOnNext(ReferenceCounted::release).take(5L).as(StepVerifier::create)).expectNextCount(5L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestStream request and consumes result incrementally")
    @Test
    default public void requestStreamDelayedRequestN() {
        ((StepVerifier.FirstStep)this.getClient().requestStream(this.createTestPayload(3)).take(10L).doOnNext(ReferenceCounted::release).as(StepVerifier::create)).thenRequest(5L).expectNextCount(5L).thenRequest(5L).expectNextCount(5L).expectComplete().verify(this.getTimeout());
    }

    default public void assertPayload(Payload p) {
        TransportPair transportPair = this.getTransportPair();
        if (!transportPair.expectedPayloadData().equals(p.getDataUtf8()) || !transportPair.expectedPayloadMetadata().equals(p.getMetadataUtf8())) {
            throw new IllegalStateException("Unexpected payload");
        }
    }

    default public void assertChannelPayload(Payload p) {
        if (!MOCK_DATA.equals(p.getDataUtf8()) || !MOCK_METADATA.equals(p.getMetadataUtf8())) {
            throw new IllegalStateException("Unexpected payload");
        }
    }

    public static class TransportPair<T, S extends Closeable>
    implements Disposable {
        private static final String data = "hello world";
        private static final String metadata = "metadata";
        private final LeaksTrackingByteBufAllocator byteBufAllocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT, Duration.ofMinutes(1L));
        private final TestRSocket responder;
        private final RSocket client;
        private final S server;

        public TransportPair(Supplier<T> addressSupplier, TriFunction<T, S, ByteBufAllocator, ClientTransport> clientTransportSupplier, BiFunction<T, ByteBufAllocator, ServerTransport<S>> serverTransportSupplier) {
            this(addressSupplier, clientTransportSupplier, serverTransportSupplier, false);
        }

        public TransportPair(Supplier<T> addressSupplier, TriFunction<T, S, ByteBufAllocator, ClientTransport> clientTransportSupplier, BiFunction<T, ByteBufAllocator, ServerTransport<S>> serverTransportSupplier, boolean withRandomFragmentation) {
            ByteBufAllocator allocatorToSupply;
            T address = addressSupplier.get();
            boolean runClientWithAsyncInterceptors = ThreadLocalRandom.current().nextBoolean();
            boolean runServerWithAsyncInterceptors = ThreadLocalRandom.current().nextBoolean();
            if (ResourceLeakDetector.getLevel() == ResourceLeakDetector.Level.ADVANCED || ResourceLeakDetector.getLevel() == ResourceLeakDetector.Level.PARANOID) {
                logger.info(() -> "Using LeakTrackingByteBufAllocator");
                allocatorToSupply = this.byteBufAllocator;
            } else {
                allocatorToSupply = ByteBufAllocator.DEFAULT;
            }
            this.responder = new TestRSocket(data, "metadata");
            RSocketServer rSocketServer = RSocketServer.create((setup, sendingSocket) -> Mono.just((Object)this.responder)).payloadDecoder(PayloadDecoder.ZERO_COPY).interceptors(registry -> {
                if (runServerWithAsyncInterceptors) {
                    logger.info(() -> "Perform Integration Test with Async Interceptors Enabled For Server");
                    registry.forConnection((type, duplexConnection) -> new AsyncDuplexConnection((DuplexConnection)duplexConnection)).forSocketAcceptor(delegate -> (connectionSetupPayload, sendingSocket) -> delegate.accept(connectionSetupPayload, sendingSocket).subscribeOn(Schedulers.parallel()));
                }
            });
            if (withRandomFragmentation) {
                rSocketServer.fragment(ThreadLocalRandom.current().nextInt(256, 512));
            }
            this.server = (Closeable)rSocketServer.bind(serverTransportSupplier.apply(address, allocatorToSupply)).block();
            RSocketConnector rSocketConnector = RSocketConnector.create().payloadDecoder(PayloadDecoder.ZERO_COPY).keepAlive(Duration.ofMillis(Integer.MAX_VALUE), Duration.ofMillis(Integer.MAX_VALUE)).interceptors(registry -> {
                if (runClientWithAsyncInterceptors) {
                    logger.info(() -> "Perform Integration Test with Async Interceptors Enabled For Client");
                    registry.forConnection((type, duplexConnection) -> new AsyncDuplexConnection((DuplexConnection)duplexConnection)).forSocketAcceptor(delegate -> (connectionSetupPayload, sendingSocket) -> delegate.accept(connectionSetupPayload, sendingSocket).subscribeOn(Schedulers.parallel()));
                }
            });
            if (withRandomFragmentation) {
                rSocketConnector.fragment(ThreadLocalRandom.current().nextInt(256, 512));
            }
            this.client = (RSocket)rSocketConnector.connect(clientTransportSupplier.apply(address, this.server, allocatorToSupply)).doOnError(Throwable::printStackTrace).block();
        }

        public void dispose() {
            this.server.dispose();
            this.client.dispose();
        }

        RSocket getClient() {
            return this.client;
        }

        public String expectedPayloadData() {
            return data;
        }

        public String expectedPayloadMetadata() {
            return "metadata";
        }

        private static class ByteBufReleaserOperator
        implements CoreSubscriber<ByteBuf>,
        Subscription,
        Fuseable.QueueSubscription<ByteBuf> {
            final CoreSubscriber<? super ByteBuf> actual;
            Subscription s;

            public ByteBufReleaserOperator(CoreSubscriber<? super ByteBuf> actual) {
                this.actual = actual;
            }

            public void onSubscribe(Subscription s) {
                if (Operators.validate((Subscription)this.s, (Subscription)s)) {
                    this.s = s;
                    this.actual.onSubscribe((Subscription)this);
                }
            }

            public void onNext(ByteBuf buf) {
                this.actual.onNext((Object)buf);
                buf.release();
            }

            public void onError(Throwable t) {
                this.actual.onError(t);
            }

            public void onComplete() {
                this.actual.onComplete();
            }

            public void request(long n) {
                this.s.request(n);
            }

            public void cancel() {
                this.s.cancel();
            }

            public int requestFusion(int requestedMode) {
                return 0;
            }

            public ByteBuf poll() {
                throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
            }

            public int size() {
                throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
            }

            public boolean isEmpty() {
                throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
            }

            public void clear() {
                throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
            }
        }

        private static class AsyncDuplexConnection
        implements DuplexConnection {
            private final DuplexConnection duplexConnection;

            public AsyncDuplexConnection(DuplexConnection duplexConnection) {
                this.duplexConnection = duplexConnection;
            }

            public Mono<Void> send(Publisher<ByteBuf> frames) {
                return this.duplexConnection.send(frames);
            }

            public Flux<ByteBuf> receive() {
                return this.duplexConnection.receive().subscribeOn(Schedulers.parallel()).doOnNext(ByteBuf::retain).publishOn(Schedulers.parallel(), Integer.MAX_VALUE).doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::safeRelease).transform(Operators.lift((__, actual) -> new ByteBufReleaserOperator((CoreSubscriber<? super ByteBuf>)actual)));
            }

            public ByteBufAllocator alloc() {
                return this.duplexConnection.alloc();
            }

            public Mono<Void> onClose() {
                return this.duplexConnection.onClose();
            }

            public void dispose() {
                this.duplexConnection.dispose();
            }
        }
    }
}

