/*
 * Decompiled with CFR 0.152.
 */
package io.modelcontextprotocol.client;

import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import io.modelcontextprotocol.client.McpAsyncClient;
import io.modelcontextprotocol.client.McpClient;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpTransportSessionClosedException;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import reactor.test.StepVerifier;

public abstract class AbstractMcpAsyncClientResiliencyTests {
    private static final Logger logger = LoggerFactory.getLogger(AbstractMcpAsyncClientResiliencyTests.class);
    static Network network = Network.newNetwork();
    public static String host = "http://localhost:3001";
    static GenericContainer<?> container = new GenericContainer("docker.io/node:lts-alpine3.23").withCommand("npx -y @modelcontextprotocol/server-everything@2025.12.18 streamableHttp").withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())).withNetwork(network).withNetworkAliases(new String[]{"everything-server"}).withExposedPorts(new Integer[]{3001}).waitingFor((WaitStrategy)Wait.forHttp((String)"/").forStatusCode(404));
    static ToxiproxyContainer toxiproxy = (ToxiproxyContainer)((ToxiproxyContainer)new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0").withNetwork(network)).withExposedPorts(new Integer[]{8474, 3000});
    static Proxy proxy;

    static void disconnect() {
        long start = System.nanoTime();
        try {
            proxy.toxics().resetPeer("RESET_DOWNSTREAM", ToxicDirection.DOWNSTREAM, 0L);
            proxy.toxics().resetPeer("RESET_UPSTREAM", ToxicDirection.UPSTREAM, 0L);
            logger.info("Disconnect took {} ms", (Object)Duration.ofNanos(System.nanoTime() - start).toMillis());
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to disconnect", e);
        }
    }

    static void reconnect() {
        long start = System.nanoTime();
        try {
            proxy.toxics().get("RESET_UPSTREAM").remove();
            proxy.toxics().get("RESET_DOWNSTREAM").remove();
            logger.info("Reconnect took {} ms", (Object)Duration.ofNanos(System.nanoTime() - start).toMillis());
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to reconnect", e);
        }
    }

    static void restartMcpServer() {
        container.stop();
        container.start();
    }

    abstract McpClientTransport createMcpTransport();

    protected Duration getRequestTimeout() {
        return Duration.ofSeconds(14L);
    }

    protected Duration getInitializationTimeout() {
        return Duration.ofSeconds(2L);
    }

    McpAsyncClient client(McpClientTransport transport) {
        return this.client(transport, Function.identity());
    }

    McpAsyncClient client(McpClientTransport transport, Function<McpClient.AsyncSpec, McpClient.AsyncSpec> customizer) {
        AtomicReference client = new AtomicReference();
        Assertions.assertThatCode(() -> {
            McpClient.AsyncSpec builder = McpClient.async((McpClientTransport)transport).requestTimeout(this.getRequestTimeout()).initializationTimeout(this.getInitializationTimeout()).capabilities(McpSchema.ClientCapabilities.builder().build());
            builder = (McpClient.AsyncSpec)customizer.apply(builder);
            client.set(builder.build());
        }).doesNotThrowAnyException();
        return (McpAsyncClient)client.get();
    }

    void withClient(McpClientTransport transport, Consumer<McpAsyncClient> c) {
        this.withClient(transport, Function.identity(), c);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void withClient(McpClientTransport transport, Function<McpClient.AsyncSpec, McpClient.AsyncSpec> customizer, Consumer<McpAsyncClient> c) {
        McpAsyncClient client = this.client(transport, customizer);
        try {
            c.accept(client);
        }
        finally {
            StepVerifier.create((Publisher)client.closeGracefully()).expectComplete().verify(Duration.ofSeconds(10L));
        }
    }

    @Test
    void testPing() {
        this.withClient(this.createMcpTransport(), mcpAsyncClient -> {
            StepVerifier.create((Publisher)mcpAsyncClient.initialize()).expectNextCount(1L).verifyComplete();
            AbstractMcpAsyncClientResiliencyTests.disconnect();
            StepVerifier.create((Publisher)mcpAsyncClient.ping()).expectError().verify();
            AbstractMcpAsyncClientResiliencyTests.reconnect();
            StepVerifier.create((Publisher)mcpAsyncClient.ping()).expectNextCount(1L).verifyComplete();
        });
    }

    @Test
    void testSessionInvalidation() {
        this.withClient(this.createMcpTransport(), mcpAsyncClient -> {
            StepVerifier.create((Publisher)mcpAsyncClient.initialize()).expectNextCount(1L).verifyComplete();
            AbstractMcpAsyncClientResiliencyTests.restartMcpServer();
            StepVerifier.create((Publisher)mcpAsyncClient.ping().retry(1L)).expectNextCount(1L).verifyComplete();
        });
    }

    @Test
    void testCallTool() {
        this.withClient(this.createMcpTransport(), mcpAsyncClient -> {
            AtomicReference tools = new AtomicReference();
            StepVerifier.create((Publisher)mcpAsyncClient.initialize()).expectNextCount(1L).verifyComplete();
            StepVerifier.create((Publisher)mcpAsyncClient.listTools()).consumeNextWith(list -> tools.set(list.tools())).verifyComplete();
            AbstractMcpAsyncClientResiliencyTests.disconnect();
            String name = ((McpSchema.Tool)((List)tools.get()).get(0)).name();
            McpSchema.CallToolRequest request = new McpSchema.CallToolRequest(name, Map.of("message", "hello"));
            StepVerifier.create((Publisher)mcpAsyncClient.callTool(request)).expectError().verify();
            AbstractMcpAsyncClientResiliencyTests.reconnect();
            StepVerifier.create((Publisher)mcpAsyncClient.callTool(request)).expectNextCount(1L).verifyComplete();
        });
    }

    @Test
    void testSessionClose() {
        this.withClient(this.createMcpTransport(), mcpAsyncClient -> {
            StepVerifier.create((Publisher)mcpAsyncClient.initialize()).expectNextCount(1L).verifyComplete();
            StepVerifier.create((Publisher)mcpAsyncClient.closeGracefully()).expectComplete().verify();
            StepVerifier.create((Publisher)mcpAsyncClient.ping()).expectErrorMatches(err -> err.getCause() instanceof McpTransportSessionClosedException).verify();
        });
    }

    static {
        container.start();
        toxiproxy.start();
        ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort());
        try {
            proxy = toxiproxyClient.createProxy("everything-server", "0.0.0.0:3000", "everything-server:3001");
        }
        catch (IOException e) {
            throw new RuntimeException("Can't create proxy!", e);
        }
        String ipAddressViaToxiproxy = toxiproxy.getHost();
        int portViaToxiproxy = toxiproxy.getMappedPort(3000);
        host = "http://" + ipAddressViaToxiproxy + ":" + portViaToxiproxy;
    }
}

