/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.sleuth.instrument.reactor;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.BDDAssertions;
import org.assertj.core.api.ObjectAssert;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.sleuth.CurrentTraceContext;
import org.springframework.cloud.sleuth.TraceContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@ContextConfiguration(classes={TestConfig.class})
public abstract class ScopePassingSpanSubscriberSpringBootTests {
    @Autowired
    CurrentTraceContext currentTraceContext;
    private Scheduler subscribeScheduler;
    private Scheduler secondScheduler;

    protected abstract TraceContext context();

    protected abstract TraceContext context2();

    @BeforeEach
    void setUp() {
        this.subscribeScheduler = Schedulers.newSingle((String)"subscribeThread2");
        this.secondScheduler = Schedulers.newSingle((String)"secondThread");
    }

    @AfterEach
    void tearDown() {
        this.subscribeScheduler.dispose();
        this.secondScheduler.dispose();
    }

    @Test
    public void should_pass_tracing_info_when_using_reactor() {
        AtomicReference spanInOperation = new AtomicReference();
        Flux traced = Flux.just((Object[])new Integer[]{1, 2, 3});
        try (CurrentTraceContext.Scope ws = this.currentTraceContext.newScope(this.context());){
            Flux.from((Publisher)traced).map(d -> d + 1).map(d -> d + 1).map(d -> {
                spanInOperation.set(this.currentTraceContext.context());
                return d + 1;
            }).map(d -> d + 1).subscribe(d -> {});
        }
        BDDAssertions.then((Object)this.currentTraceContext.context()).isNull();
        BDDAssertions.then(spanInOperation.get()).isEqualTo((Object)this.context());
    }

    @Test
    public void should_support_reactor_fusion_optimization() {
        AtomicReference spanInOperation = new AtomicReference();
        try (CurrentTraceContext.Scope ws = this.currentTraceContext.newScope(this.context());){
            Mono.just((Object)1).flatMap(d -> Flux.just((Object)(d + 1)).collectList().map(p -> (Integer)p.get(0))).map(d -> d + 1).map(d -> {
                spanInOperation.set(this.currentTraceContext.context());
                return d + 1;
            }).map(d -> d + 1).subscribe(d -> {});
        }
        BDDAssertions.then((Object)this.currentTraceContext.context()).isNull();
        BDDAssertions.then(spanInOperation.get()).isEqualTo((Object)this.context());
    }

    @Test
    public void should_pass_tracing_info_when_using_reactor_async() {
        AtomicReference spanInOperation = new AtomicReference();
        try (CurrentTraceContext.Scope ws = this.currentTraceContext.newScope(this.context());){
            Flux.just((Object[])new Integer[]{1, 2, 3}).publishOn(Schedulers.single()).log("reactor.1").map(d -> d + 1).map(d -> d + 1).publishOn(this.secondScheduler).log("reactor.2").map(d -> {
                spanInOperation.set(this.currentTraceContext.context());
                return d + 1;
            }).map(d -> d + 1).blockLast();
            Awaitility.await().untilAsserted(() -> {
                ObjectAssert cfr_ignored_0 = (ObjectAssert)BDDAssertions.then(spanInOperation.get()).isEqualTo((Object)this.context());
            });
            BDDAssertions.then((Object)this.currentTraceContext.context()).isEqualTo((Object)this.context());
        }
        BDDAssertions.then((Object)this.currentTraceContext.context()).isNull();
        ws = this.currentTraceContext.newScope(this.context2());
        var3_3 = null;
        try {
            Flux.just((Object[])new Integer[]{1, 2, 3}).publishOn(Schedulers.single()).log("reactor.").map(d -> d + 1).map(d -> d + 1).map(d -> {
                spanInOperation.set(this.currentTraceContext.context());
                return d + 1;
            }).map(d -> d + 1).blockLast();
            BDDAssertions.then((Object)this.currentTraceContext.context()).isEqualTo((Object)this.context2());
            BDDAssertions.then(spanInOperation.get()).isEqualTo((Object)this.context2());
        }
        catch (Throwable throwable) {
            var3_3 = throwable;
            throw throwable;
        }
        finally {
            if (ws != null) {
                if (var3_3 != null) {
                    try {
                        ws.close();
                    }
                    catch (Throwable throwable) {
                        var3_3.addSuppressed(throwable);
                    }
                } else {
                    ws.close();
                }
            }
        }
        BDDAssertions.then((Object)this.currentTraceContext.context()).isNull();
    }

    @Test
    public void should_pass_tracing_info_into_sources_when_using_reactor_async() {
        AtomicReference spanInOperation = new AtomicReference();
        try (CurrentTraceContext.Scope ws = this.currentTraceContext.newScope(this.context());){
            Mono.fromSupplier(() -> {
                spanInOperation.set(this.currentTraceContext.context());
                return 1;
            }).publishOn(Schedulers.single()).log("reactor.1").map(d -> d + 1).map(d -> d + 1).publishOn(this.secondScheduler).log("reactor.2").map(d -> d + 1).map(d -> d + 1).subscribeOn(this.subscribeScheduler).block();
            Awaitility.await().untilAsserted(() -> {
                ObjectAssert cfr_ignored_0 = (ObjectAssert)BDDAssertions.then(spanInOperation.get()).isEqualTo((Object)this.context());
            });
            BDDAssertions.then((Object)this.currentTraceContext.context()).isEqualTo((Object)this.context());
        }
        BDDAssertions.then((Object)this.currentTraceContext.context()).isNull();
        ws = this.currentTraceContext.newScope(this.context2());
        var3_3 = null;
        try {
            Mono.fromCallable(() -> {
                spanInOperation.set(this.currentTraceContext.context());
                return 1;
            }).log("reactor.").map(d -> d + 1).map(d -> d + 1).map(d -> d + 1).subscribeOn(this.subscribeScheduler).block();
            BDDAssertions.then((Object)this.currentTraceContext.context()).isEqualTo((Object)this.context2());
            BDDAssertions.then(spanInOperation.get()).isEqualTo((Object)this.context2());
        }
        catch (Throwable throwable) {
            var3_3 = throwable;
            throw throwable;
        }
        finally {
            if (ws != null) {
                if (var3_3 != null) {
                    try {
                        ws.close();
                    }
                    catch (Throwable throwable) {
                        var3_3.addSuppressed(throwable);
                    }
                } else {
                    ws.close();
                }
            }
        }
        BDDAssertions.then((Object)this.currentTraceContext.context()).isNull();
    }

    @Test
    @Disabled(value="Will work only for on each - by accident")
    public void should_pass_tracing_info_when_using_reactor_async_processor() {
        AtomicReference spanInOperation = new AtomicReference();
        Sinks.One one = Sinks.one();
        try (CurrentTraceContext.Scope ws = this.currentTraceContext.newScope(this.context());){
            one.asMono().map(d -> d + 1).map(d -> {
                spanInOperation.set(this.currentTraceContext.context());
                return d + 1;
            }).map(d -> d + 1).doOnSubscribe(subscription -> {
                Thread thread = new Thread(() -> {
                    try {
                        Thread.sleep(300L);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    one.tryEmitValue((Object)0);
                });
                thread.setName("async_processor_source");
                thread.setDaemon(true);
                thread.start();
            }).subscribeOn(Schedulers.boundedElastic()).block();
            Awaitility.await().untilAsserted(() -> {
                ObjectAssert cfr_ignored_0 = (ObjectAssert)BDDAssertions.then(spanInOperation.get()).isEqualTo((Object)this.context());
            });
            BDDAssertions.then((Object)this.currentTraceContext.context()).isEqualTo((Object)this.context());
        }
        BDDAssertions.then((Object)this.currentTraceContext.context()).isNull();
    }

    @Test
    public void onlyConsidersContextDuringSubscribe() {
        Mono fromMono = Mono.fromCallable(() -> ((CurrentTraceContext)this.currentTraceContext).context());
        try (CurrentTraceContext.Scope ws = this.currentTraceContext.newScope(this.context());){
            BDDAssertions.then((Object)fromMono.map(context -> context).block()).isNotNull();
        }
    }

    @Test
    public void checkTraceIdDuringZipOperation() {
        AtomicReference spanInOperation = new AtomicReference();
        AtomicReference spanInZipOperation = new AtomicReference();
        try (CurrentTraceContext.Scope ws = this.currentTraceContext.newScope(this.context());){
            Mono.fromCallable(() -> ((CurrentTraceContext)this.currentTraceContext).context()).map(span -> span).doOnNext(spanInOperation::set).zipWith(Mono.fromCallable(() -> ((CurrentTraceContext)this.currentTraceContext).context()).map(span -> span).doOnNext(spanInZipOperation::set)).block();
        }
        BDDAssertions.then(spanInZipOperation).hasValue((Object)this.context());
        BDDAssertions.then(spanInOperation).hasValue((Object)this.context());
    }

    @Test
    public void should_work_for_mono_just_with_flat_map() {
        try (CurrentTraceContext.Scope ws = this.currentTraceContext.newScope(this.context());){
            Mono.just((Object)"value1").flatMap(request -> Mono.just((Object)"value2").then(Mono.just((Object)"foo"))).map(a -> "qwe").block();
        }
    }

    @Test
    public void checkTraceIdFromSubscriberContext() {
        AtomicReference spanInSubscriberContext = new AtomicReference();
        try (CurrentTraceContext.Scope ws = this.currentTraceContext.newScope(this.context());){
            Mono.subscriberContext().map(context -> this.currentTraceContext.context()).doOnNext(spanInSubscriberContext::set).block();
        }
        BDDAssertions.then(spanInSubscriberContext).hasValue((Object)this.context());
    }

    @Test
    public void should_pass_tracing_info_into_inner_publishers() {
        AtomicReference spanInOperation = new AtomicReference();
        try (CurrentTraceContext.Scope ws = this.currentTraceContext.newScope(this.context());){
            Flux.range((int)0, (int)5).flatMap(it -> Mono.delay((Duration)Duration.ofMillis(1L)).map(context -> this.currentTraceContext.context()).doOnNext(spanInOperation::set)).blockFirst();
        }
        BDDAssertions.then(spanInOperation.get()).isEqualTo((Object)this.context());
    }

    @EnableAutoConfiguration
    @Configuration(proxyBeanMethods=false)
    static class TestConfig {
        TestConfig() {
        }
    }
}

