/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.client.filter;

import io.rsocket.Availability;
import io.rsocket.Closeable;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.stat.Ewma;
import io.rsocket.util.Clock;
import io.rsocket.util.RSocketProxy;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

public class RSocketSupplier
implements Availability,
Supplier<Mono<RSocket>>,
Closeable {
    private static final double EPSILON = 1.0E-4;
    private Supplier<Mono<RSocket>> rSocketSupplier;
    private final MonoProcessor<Void> onClose;
    private final long tau;
    private long stamp;
    private final Ewma errorPercentage;

    public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier, long halfLife, TimeUnit unit) {
        this.rSocketSupplier = rSocketSupplier;
        this.tau = Clock.unit().convert((long)((double)halfLife / Math.log(2.0)), unit);
        this.stamp = Clock.now();
        this.errorPercentage = new Ewma(halfLife, unit, 1.0);
        this.onClose = MonoProcessor.create();
    }

    public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier) {
        this(rSocketSupplier, 5L, TimeUnit.SECONDS);
    }

    public double availability() {
        double e = this.errorPercentage.value();
        if (Clock.now() - this.stamp > this.tau) {
            double a = Math.min(1.0, e + 0.5);
            this.errorPercentage.reset(a);
        }
        if (e < 1.0E-4) {
            e = 0.0;
        } else if (0.9999 < e) {
            e = 1.0;
        }
        return e;
    }

    private synchronized void updateErrorPercentage(double value) {
        this.errorPercentage.insert(value);
        this.stamp = Clock.now();
    }

    @Override
    public Mono<RSocket> get() {
        return this.rSocketSupplier.get().doOnNext(o -> this.updateErrorPercentage(1.0)).doOnError(t -> this.updateErrorPercentage(0.0)).map(x$0 -> new AvailabilityAwareRSocketProxy((RSocket)x$0));
    }

    public void dispose() {
        this.onClose.onComplete();
    }

    public boolean isDisposed() {
        return this.onClose.isDisposed();
    }

    public Mono<Void> onClose() {
        return this.onClose;
    }

    private class AvailabilityAwareRSocketProxy
    extends RSocketProxy {
        public AvailabilityAwareRSocketProxy(RSocket source) {
            super(source);
            RSocketSupplier.this.onClose.doFinally(signalType -> source.dispose()).subscribe();
        }

        public Mono<Void> fireAndForget(Payload payload) {
            return this.source.fireAndForget(payload).doOnError(t -> RSocketSupplier.this.errorPercentage.insert(0.0)).doOnSuccess(v -> RSocketSupplier.this.updateErrorPercentage(1.0));
        }

        public Mono<Payload> requestResponse(Payload payload) {
            return this.source.requestResponse(payload).doOnError(t -> RSocketSupplier.this.errorPercentage.insert(0.0)).doOnSuccess(p -> RSocketSupplier.this.updateErrorPercentage(1.0));
        }

        public Flux<Payload> requestStream(Payload payload) {
            return this.source.requestStream(payload).doOnError(th -> RSocketSupplier.this.errorPercentage.insert(0.0)).doOnComplete(() -> RSocketSupplier.this.updateErrorPercentage(1.0));
        }

        public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
            return this.source.requestChannel(payloads).doOnError(th -> RSocketSupplier.this.errorPercentage.insert(0.0)).doOnComplete(() -> RSocketSupplier.this.updateErrorPercentage(1.0));
        }

        public Mono<Void> metadataPush(Payload payload) {
            return this.source.metadataPush(payload).doOnError(t -> RSocketSupplier.this.errorPercentage.insert(0.0)).doOnSuccess(v -> RSocketSupplier.this.updateErrorPercentage(1.0));
        }

        public double availability() {
            if (Clock.now() - RSocketSupplier.this.stamp > RSocketSupplier.this.tau) {
                RSocketSupplier.this.updateErrorPercentage(1.0);
            }
            return this.source.availability() * RSocketSupplier.this.errorPercentage.value();
        }
    }
}

