/*
 * Decompiled with CFR 0.152.
 */
package io.opentelemetry.testing.internal.armeria.common.stream;

import io.opentelemetry.testing.internal.armeria.common.stream.StreamMessage;
import io.opentelemetry.testing.internal.armeria.common.stream.SubscriptionOption;
import io.opentelemetry.testing.internal.io.netty.util.concurrent.EventExecutor;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

final class SubscribeOnStreamMessage<T>
implements StreamMessage<T> {
    private final StreamMessage<T> upstream;
    private final EventExecutor upstreamExecutor;

    SubscribeOnStreamMessage(StreamMessage<T> upstream, EventExecutor upstreamExecutor) {
        this.upstream = upstream;
        this.upstreamExecutor = upstreamExecutor;
    }

    @Override
    public boolean isOpen() {
        return this.upstream.isOpen();
    }

    @Override
    public boolean isEmpty() {
        return this.upstream.isEmpty();
    }

    @Override
    public long demand() {
        return this.upstream.demand();
    }

    @Override
    public CompletableFuture<Void> whenComplete() {
        return this.upstream.whenComplete();
    }

    @Override
    public EventExecutor defaultSubscriberExecutor() {
        return this.upstreamExecutor;
    }

    @Override
    public void subscribe(Subscriber<? super T> subscriber, EventExecutor downstreamExecutor, SubscriptionOption ... options) {
        Subscriber<? super T> subscriber0 = this.upstreamExecutor == downstreamExecutor ? subscriber : new SchedulingSubscriber<T>(downstreamExecutor, subscriber);
        if (this.upstreamExecutor.inEventLoop()) {
            this.upstream.subscribe(subscriber0, downstreamExecutor, options);
        } else {
            this.upstreamExecutor.execute(() -> this.upstream.subscribe(subscriber0, this.upstreamExecutor, options));
        }
    }

    @Override
    public void abort() {
        this.upstream.abort();
    }

    @Override
    public void abort(Throwable cause) {
        this.upstream.abort(cause);
    }

    static class SchedulingSubscriber<T>
    implements Subscriber<T> {
        private final Subscriber<? super T> downstream;
        private final EventExecutor downstreamExecutor;

        SchedulingSubscriber(EventExecutor downstreamExecutor, Subscriber<? super T> downstream) {
            this.downstream = downstream;
            this.downstreamExecutor = downstreamExecutor;
        }

        @Override
        public void onSubscribe(Subscription s) {
            this.downstreamExecutor.execute(() -> this.downstream.onSubscribe(s));
        }

        @Override
        public void onNext(T t) {
            this.downstreamExecutor.execute(() -> this.downstream.onNext(t));
        }

        @Override
        public void onError(Throwable t) {
            this.downstreamExecutor.execute(() -> this.downstream.onError(t));
        }

        @Override
        public void onComplete() {
            this.downstreamExecutor.execute(this.downstream::onComplete);
        }
    }
}

