/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.fabric;

import java.util.List;
import java.util.concurrent.Flow;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.neo4j.configuration.connectors.ConnectorPortRegister;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Session;
import org.neo4j.driver.TransactionContext;
import org.neo4j.driver.reactive.ReactiveSession;
import org.neo4j.driver.reactive.ReactiveTransaction;
import org.neo4j.fabric.DriverUtils;
import org.neo4j.test.extension.BoltDbmsExtension;
import org.neo4j.test.extension.Inject;
import org.reactivestreams.Publisher;
import reactor.adapter.JdkFlowAdapter;
import reactor.core.publisher.Mono;

@BoltDbmsExtension
@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
class BoltLocalResultStreamTest {
    @Inject
    private static ConnectorPortRegister connectorPortRegister;
    private static Driver driver;

    BoltLocalResultStreamTest() {
    }

    @BeforeAll
    static void beforeAll() {
        driver = DriverUtils.createDriver(connectorPortRegister);
    }

    @AfterAll
    static void tearDown() {
        driver.close();
    }

    @Test
    void testBasicResultStream() {
        List result = BoltLocalResultStreamTest.inTx(tx -> tx.run("UNWIND range(0, 4) AS i RETURN 'r' + i as A").stream().map(r -> r.get("A").asString()).collect(Collectors.toList()));
        Assertions.assertThat((List)result).isEqualTo(List.of("r0", "r1", "r2", "r3", "r4"));
    }

    @Test
    void testRxResultStream() {
        List result = BoltLocalResultStreamTest.inRxTx(tx -> ((List)Mono.fromDirect((Publisher)JdkFlowAdapter.flowPublisherToFlux((Flow.Publisher)tx.run("UNWIND range(0, 4) AS i RETURN 'r' + i as A"))).flatMapMany(reactiveResult -> JdkFlowAdapter.flowPublisherToFlux((Flow.Publisher)reactiveResult.records())).limitRate(1).collectList().block()).stream().map(r -> r.get("A").asString()).collect(Collectors.toList()));
        Assertions.assertThat((List)result).isEqualTo(List.of("r0", "r1", "r2", "r3", "r4"));
    }

    @Test
    void testPartialStream() {
        List result = BoltLocalResultStreamTest.inRxTx(tx -> ((List)Mono.fromDirect((Publisher)JdkFlowAdapter.flowPublisherToFlux((Flow.Publisher)tx.run("UNWIND range(0, 4) AS i RETURN 'r' + i as A"))).flatMapMany(reactiveResult -> JdkFlowAdapter.flowPublisherToFlux((Flow.Publisher)reactiveResult.records())).limitRequest(2L).collectList().block()).stream().map(r -> r.get("A").asString()).collect(Collectors.toList()));
        Assertions.assertThat((List)result).isEqualTo(List.of("r0", "r1"));
    }

    private static <T> T inTx(Function<TransactionContext, T> workload) {
        try (Session session = driver.session();){
            Object object = session.executeWrite(workload::apply);
            return (T)object;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static <T> T inRxTx(Function<ReactiveTransaction, T> workload) {
        ReactiveSession session = driver.reactiveSession();
        try {
            T t;
            ReactiveTransaction tx = (ReactiveTransaction)Mono.from((Publisher)JdkFlowAdapter.flowPublisherToFlux((Flow.Publisher)session.beginTransaction())).block();
            try {
                t = workload.apply(tx);
            }
            catch (Throwable throwable) {
                Mono.from((Publisher)JdkFlowAdapter.flowPublisherToFlux((Flow.Publisher)tx.rollback())).block();
                throw throwable;
            }
            Mono.from((Publisher)JdkFlowAdapter.flowPublisherToFlux((Flow.Publisher)tx.rollback())).block();
            return t;
        }
        finally {
            Mono.from((Publisher)JdkFlowAdapter.flowPublisherToFlux((Flow.Publisher)session.close())).block();
        }
    }
}

