/*
 * Decompiled with CFR 0.152.
 */
package reactor.core.publisher;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.InnerOperator;
import reactor.core.publisher.Operators;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

final class FluxMetrics<T>
extends FluxOperator<T, T> {
    private static final Logger LOGGER;
    private static final boolean isMicrometerAvailable;
    static final String REACTOR_DEFAULT_NAME = "reactor";
    static final String METER_MALFORMED = "reactor.malformed.source";
    static final String METER_SUBSCRIBED = "reactor.subscribed";
    static final String METER_FLOW_DURATION = "reactor.flow.duration";
    static final String METER_ON_NEXT_DELAY = "reactor.onNext.delay";
    static final String METER_REQUESTED = "reactor.requested";
    static final String TAG_STATUS = "status";
    static final String TAG_EXCEPTION = "exception";
    static final String TAG_SEQUENCE_NAME = "flow";
    static final String TAG_SEQUENCE_TYPE = "type";
    static final String TAGVALUE_ON_ERROR = "error";
    static final String TAGVALUE_ON_COMPLETE = "completed";
    static final String TAGVALUE_CANCEL = "cancelled";
    static final String TAGVALUE_FLUX = "Flux";
    static final String TAGVALUE_MONO = "Mono";
    final String name;
    final List<Tag> tags;
    @Nullable
    final MeterRegistry registryCandidate;

    static boolean isMicrometerAvailable() {
        return isMicrometerAvailable;
    }

    static Tuple2<String, List<Tag>> resolveNameAndTags(Publisher<?> source) {
        List tags;
        String name;
        Scannable scannable = Scannable.from(source);
        if (scannable.isScanAvailable()) {
            String nameOrDefault = scannable.name();
            name = scannable.stepName().equals(nameOrDefault) ? REACTOR_DEFAULT_NAME : nameOrDefault;
            tags = scannable.tags().map((? super T tuple) -> Tag.of((String)((String)tuple.getT1()), (String)((String)tuple.getT2()))).collect(Collectors.toList());
        } else {
            LOGGER.warn("Attempting to activate metrics but the upstream is not Scannable. You might want to use `name()` (and optionally `tags()`) right before `metrics()`");
            name = REACTOR_DEFAULT_NAME;
            tags = Collections.emptyList();
        }
        return Tuples.of(name, tags);
    }

    FluxMetrics(Flux<? extends T> flux) {
        this(flux, null);
    }

    FluxMetrics(Flux<? extends T> flux, @Nullable MeterRegistry registry) {
        super(flux);
        Tuple2<String, List<Tag>> nameAndTags = FluxMetrics.resolveNameAndTags(flux);
        this.name = nameAndTags.getT1();
        this.tags = nameAndTags.getT2();
        this.registryCandidate = registry;
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {
        CompositeMeterRegistry registry = Metrics.globalRegistry;
        if (this.registryCandidate != null) {
            registry = this.registryCandidate;
        }
        this.source.subscribe(new MicrometerFluxMetricsSubscriber<T>(actual, (MeterRegistry)registry, Clock.SYSTEM, this.name, this.tags));
    }

    static {
        boolean micrometer;
        LOGGER = Loggers.getLogger(FluxMetrics.class);
        try {
            Metrics.globalRegistry.getRegistries();
            micrometer = true;
        }
        catch (Throwable t) {
            micrometer = false;
        }
        isMicrometerAvailable = micrometer;
    }

    static final class MicrometerFluxMetricsFuseableSubscriber<T>
    extends MicrometerFluxMetricsSubscriber<T>
    implements Fuseable,
    Fuseable.QueueSubscription<T> {
        private int fusionMode;

        MicrometerFluxMetricsFuseableSubscriber(CoreSubscriber<? super T> actual, MeterRegistry registry, Clock clock, String sequenceName, List<Tag> sequenceTags) {
            super(actual, registry, clock, sequenceName, sequenceTags);
        }

        @Override
        public void onNext(T t) {
            if (this.fusionMode == 2) {
                this.actual.onNext(null);
                return;
            }
            if (this.done) {
                this.malformedSourceCounter.increment();
                Operators.onNextDropped(t, this.actual.currentContext());
                return;
            }
            long last = this.lastNextEventNanos;
            this.lastNextEventNanos = this.clock.monotonicTime();
            this.onNextIntervalTimer.record(this.lastNextEventNanos - last, TimeUnit.NANOSECONDS);
            this.actual.onNext(t);
        }

        @Override
        public int requestFusion(int mode) {
            if (this.qs != null) {
                this.fusionMode = this.qs.requestFusion(mode);
                return this.fusionMode;
            }
            return 0;
        }

        @Override
        @Nullable
        public T poll() {
            if (this.qs == null) {
                return null;
            }
            try {
                Object v = this.qs.poll();
                if (v == null && this.fusionMode == 1) {
                    this.subscribeToTerminateSample.stop(this.subscribeToCompleteTimer);
                }
                if (v != null) {
                    long last = this.lastNextEventNanos;
                    this.lastNextEventNanos = this.clock.monotonicTime();
                    this.onNextIntervalTimer.record(this.lastNextEventNanos - last, TimeUnit.NANOSECONDS);
                }
                return (T)v;
            }
            catch (Throwable e) {
                Timer timer = this.subscribeToErrorTimerBuilder.tag(FluxMetrics.TAG_EXCEPTION, e.getClass().getName()).register(this.registry);
                this.subscribeToTerminateSample.stop(timer);
                throw e;
            }
        }

        @Override
        public void clear() {
            if (this.qs != null) {
                this.qs.clear();
            }
        }

        @Override
        public boolean isEmpty() {
            return this.qs == null || this.qs.isEmpty();
        }

        @Override
        public int size() {
            return this.qs == null ? 0 : this.qs.size();
        }
    }

    static class MicrometerFluxMetricsSubscriber<T>
    implements InnerOperator<T, T> {
        final CoreSubscriber<? super T> actual;
        final MeterRegistry registry;
        final Clock clock;
        final Counter malformedSourceCounter;
        final Counter subscribedCounter;
        final DistributionSummary requestedCounter;
        Timer.Sample subscribeToTerminateSample;
        long lastNextEventNanos = -1L;
        boolean done;
        @Nullable
        Fuseable.QueueSubscription<T> qs;
        Subscription s;
        final Timer onNextIntervalTimer;
        final Timer subscribeToCompleteTimer;
        final Timer.Builder subscribeToErrorTimerBuilder;
        final Timer subscribeToCancelTimer;

        MicrometerFluxMetricsSubscriber(CoreSubscriber<? super T> actual, MeterRegistry registry, Clock clock, String sequenceName, List<Tag> sequenceTags) {
            this.actual = actual;
            this.registry = registry;
            this.clock = clock;
            ArrayList<Tag> commonTags = new ArrayList<Tag>();
            commonTags.add(Tag.of((String)FluxMetrics.TAG_SEQUENCE_NAME, (String)sequenceName));
            commonTags.add(Tag.of((String)FluxMetrics.TAG_SEQUENCE_TYPE, (String)FluxMetrics.TAGVALUE_FLUX));
            commonTags.addAll(sequenceTags);
            this.subscribeToCompleteTimer = Timer.builder((String)FluxMetrics.METER_FLOW_DURATION).tags(commonTags).tag(FluxMetrics.TAG_STATUS, FluxMetrics.TAGVALUE_ON_COMPLETE).description("Times the duration elapsed between a subscription and the onComplete termination of the sequence").register(registry);
            this.subscribeToCancelTimer = Timer.builder((String)FluxMetrics.METER_FLOW_DURATION).tags(commonTags).tag(FluxMetrics.TAG_STATUS, FluxMetrics.TAGVALUE_CANCEL).description("Times the duration elapsed between a subscription and the cancellation of the sequence").register(registry);
            this.subscribeToErrorTimerBuilder = Timer.builder((String)FluxMetrics.METER_FLOW_DURATION).tags(commonTags).tag(FluxMetrics.TAG_STATUS, FluxMetrics.TAGVALUE_ON_ERROR).description("Times the duration elapsed between a subscription and the onError termination of the sequence, with the exception name as a tag");
            this.onNextIntervalTimer = Timer.builder((String)FluxMetrics.METER_ON_NEXT_DELAY).tags(commonTags).description("Measures delays between onNext signals (or between onSubscribe and first onNext)").register(registry);
            this.subscribedCounter = Counter.builder((String)FluxMetrics.METER_SUBSCRIBED).tags(commonTags).baseUnit("subscribers").description("Counts how many Reactor sequences have been subscribed to").register(registry);
            this.malformedSourceCounter = registry.counter(FluxMetrics.METER_MALFORMED, commonTags);
            this.requestedCounter = !FluxMetrics.REACTOR_DEFAULT_NAME.equals(sequenceName) ? DistributionSummary.builder((String)FluxMetrics.METER_REQUESTED).tags(commonTags).description("Counts the amount requested to a named Flux by all subscribers, until at least one requests an unbounded amount").baseUnit("requested amount").register(registry) : null;
        }

        @Override
        public CoreSubscriber<? super T> actual() {
            return this.actual;
        }

        @Override
        public void onNext(T t) {
            if (this.done) {
                this.malformedSourceCounter.increment();
                Operators.onNextDropped(t, this.actual.currentContext());
                return;
            }
            long last = this.lastNextEventNanos;
            this.lastNextEventNanos = this.clock.monotonicTime();
            this.onNextIntervalTimer.record(this.lastNextEventNanos - last, TimeUnit.NANOSECONDS);
            this.actual.onNext(t);
        }

        @Override
        public void onError(Throwable e) {
            if (this.done) {
                this.malformedSourceCounter.increment();
                Operators.onErrorDropped(e, this.actual.currentContext());
                return;
            }
            this.done = true;
            Timer timer = this.subscribeToErrorTimerBuilder.tag(FluxMetrics.TAG_EXCEPTION, e.getClass().getName()).register(this.registry);
            this.subscribeToTerminateSample.stop(timer);
            this.actual.onError(e);
        }

        @Override
        public void onComplete() {
            if (this.done) {
                this.malformedSourceCounter.increment();
                return;
            }
            this.done = true;
            this.subscribeToTerminateSample.stop(this.subscribeToCompleteTimer);
            this.actual.onComplete();
        }

        @Override
        public void onSubscribe(Subscription s) {
            if (Operators.validate(this.s, s)) {
                this.subscribedCounter.increment();
                this.subscribeToTerminateSample = Timer.start((MeterRegistry)this.registry);
                this.lastNextEventNanos = this.clock.monotonicTime();
                if (s instanceof Fuseable.QueueSubscription) {
                    this.qs = (Fuseable.QueueSubscription)s;
                }
                this.s = s;
                this.actual.onSubscribe(this);
            }
        }

        @Override
        public void request(long l) {
            if (Operators.validate(l)) {
                if (this.requestedCounter != null) {
                    this.requestedCounter.record((double)l);
                }
                this.s.request(l);
            }
        }

        @Override
        public void cancel() {
            this.subscribeToTerminateSample.stop(this.subscribeToCancelTimer);
            this.s.cancel();
        }
    }
}

