/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.sink.writer.strategy;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo;
import org.apache.flink.connector.base.sink.writer.strategy.ResultInfo;
import org.apache.flink.connector.base.sink.writer.strategy.ScalingStrategy;
import org.apache.flink.util.Preconditions;

@PublicEvolving
public class CongestionControlRateLimitingStrategy
implements RateLimitingStrategy {
    private final int maxInFlightRequests;
    private final ScalingStrategy<Integer> scalingStrategy;
    private int maxInFlightMessages;
    private int currentInFlightRequests;
    private int currentInFlightMessages;

    private CongestionControlRateLimitingStrategy(int maxInFlightRequests, int initialMaxInFlightMessages, ScalingStrategy<Integer> scalingStrategy) {
        Preconditions.checkArgument((maxInFlightRequests > 0 ? 1 : 0) != 0, (Object)"maxInFlightRequests must be a positive integer.");
        Preconditions.checkArgument((initialMaxInFlightMessages > 0 ? 1 : 0) != 0, (Object)"initialMaxInFlightMessages must be a positive integer.");
        Preconditions.checkNotNull(scalingStrategy, (String)"scalingStrategy must be provided.");
        this.maxInFlightRequests = maxInFlightRequests;
        this.maxInFlightMessages = initialMaxInFlightMessages;
        this.scalingStrategy = scalingStrategy;
    }

    @Override
    public void registerInFlightRequest(RequestInfo requestInfo) {
        ++this.currentInFlightRequests;
        this.currentInFlightMessages += requestInfo.getBatchSize();
    }

    @Override
    public void registerCompletedRequest(ResultInfo resultInfo) {
        this.currentInFlightRequests = Math.max(0, this.currentInFlightRequests - 1);
        this.currentInFlightMessages -= resultInfo.getBatchSize();
        this.maxInFlightMessages = resultInfo.getFailedMessages() > 0 ? this.scalingStrategy.scaleDown(this.maxInFlightMessages).intValue() : this.scalingStrategy.scaleUp(this.maxInFlightMessages).intValue();
    }

    @Override
    public boolean shouldBlock(RequestInfo requestInfo) {
        return this.currentInFlightRequests >= this.maxInFlightRequests || this.currentInFlightMessages + requestInfo.getBatchSize() > this.maxInFlightMessages;
    }

    @Override
    public int getMaxBatchSize() {
        return this.maxInFlightMessages;
    }

    @PublicEvolving
    public static CongestionControlRateLimitingStrategyBuilder builder() {
        return new CongestionControlRateLimitingStrategyBuilder();
    }

    @PublicEvolving
    public static class CongestionControlRateLimitingStrategyBuilder {
        private int maxInFlightRequests;
        private int initialMaxInFlightMessages;
        private ScalingStrategy<Integer> scalingStrategy;

        public CongestionControlRateLimitingStrategyBuilder setMaxInFlightRequests(int maxInFlightRequests) {
            this.maxInFlightRequests = maxInFlightRequests;
            return this;
        }

        public CongestionControlRateLimitingStrategyBuilder setInitialMaxInFlightMessages(int initialMaxInFlightMessages) {
            this.initialMaxInFlightMessages = initialMaxInFlightMessages;
            return this;
        }

        public CongestionControlRateLimitingStrategyBuilder setScalingStrategy(ScalingStrategy<Integer> scalingStrategy) {
            this.scalingStrategy = scalingStrategy;
            return this;
        }

        public CongestionControlRateLimitingStrategy build() {
            return new CongestionControlRateLimitingStrategy(this.maxInFlightRequests, this.initialMaxInFlightMessages, this.scalingStrategy);
        }
    }
}

