/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.kafka;

import io.atleon.core.StarterStopper;
import io.atleon.kafka.KafkaConfig;
import io.atleon.kafka.KafkaConfigSource;
import io.atleon.kafka.ReactiveAdmin;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

public class KafkaLagThresholdStarterStopper
implements StarterStopper {
    private static final Duration DEFAULT_SAMPLE_INTERVAL = Duration.ofSeconds(5L);
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLagThresholdStarterStopper.class);
    private final KafkaConfigSource configSource;
    private final Collection<String> consumerGroupIds;
    private final Duration sampleDelay;
    private final long highTide;
    private final long lowTide;

    private KafkaLagThresholdStarterStopper(KafkaConfigSource configSource, Collection<String> consumerGroupIds, Duration sampleDelay, long highTide, long lowTide) {
        this.configSource = configSource;
        this.consumerGroupIds = consumerGroupIds;
        this.sampleDelay = sampleDelay;
        this.highTide = highTide;
        this.lowTide = lowTide;
    }

    public static KafkaLagThresholdStarterStopper create(KafkaConfigSource configSource, String consumerGroupId) {
        List<String> consumerGroupIds = Collections.singletonList(consumerGroupId);
        return new KafkaLagThresholdStarterStopper(configSource, consumerGroupIds, DEFAULT_SAMPLE_INTERVAL, 0L, 0L);
    }

    public Flux<Boolean> startStop() {
        return ((Mono)this.configSource.create()).flatMapMany(this::startStop);
    }

    public KafkaLagThresholdStarterStopper withSampleDelay(Duration sampleDelay) {
        return new KafkaLagThresholdStarterStopper(this.configSource, this.consumerGroupIds, sampleDelay, this.highTide, this.lowTide);
    }

    public KafkaLagThresholdStarterStopper withThreshold(long lagThreshold) {
        return this.withThresholds(lagThreshold, lagThreshold);
    }

    public KafkaLagThresholdStarterStopper withThresholds(long highTide, long lowTide) {
        if (highTide < lowTide) {
            throw new IllegalArgumentException("highTide must be greater-than-or-equal-to lowTide");
        }
        return new KafkaLagThresholdStarterStopper(this.configSource, this.consumerGroupIds, this.sampleDelay, highTide, lowTide);
    }

    private Flux<Boolean> startStop(KafkaConfig config) {
        return Flux.using(() -> ReactiveAdmin.create(config.nativeProperties()), this::startStop, ReactiveAdmin::close);
    }

    private Flux<Boolean> startStop(ReactiveAdmin admin) {
        return admin.listTopicPartitionGroupOffsets(this.consumerGroupIds).reduce((Object)0L, (sum, offsets) -> sum + offsets.estimateLag()).doOnNext(this::logCalculatedLag).retryWhen((Retry)Retry.fixedDelay((long)Long.MAX_VALUE, (Duration)this.sampleDelay).doBeforeRetry(this::logCalculationFailure)).repeatWhen(it -> it.delayElements(this.sampleDelay)).scan((Object)false, (started, totalLag) -> started.booleanValue() ? totalLag <= this.highTide : totalLag <= this.lowTide).skip(1L).distinctUntilChanged().doOnNext(this::logStartStopSignal);
    }

    private void logCalculatedLag(long lag) {
        LOGGER.debug("Calculated lag for consumerGroupsIds={} is {} where highTide={} and lowTide={}", new Object[]{this.consumerGroupIds, lag, this.highTide, this.lowTide});
    }

    private void logCalculationFailure(Retry.RetrySignal signal) {
        LOGGER.error("Failed to calculate total lag for consumerGroupsIds={} where signal={}. This may cause stream hanging.", this.consumerGroupIds, (Object)signal);
    }

    private void logStartStopSignal(boolean start) {
        if (start) {
            LOGGER.info("Start: Lag for consumerGroupsIds={} is at-or-below lowTide={}", this.consumerGroupIds, (Object)this.lowTide);
        } else {
            LOGGER.warn("Stop: Lag for consumerGroupsIds={} is above highTide={}", this.consumerGroupIds, (Object)this.highTide);
        }
    }
}

