/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.connector.source.util.ratelimit;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
import org.apache.flink.util.Preconditions;

@Internal
public class GatedRateLimiter<Split extends SourceSplit>
implements RateLimiter<Split> {
    private final int capacityPerCycle;
    private int capacityLeft;
    transient CompletableFuture<Void> gatingFuture = null;

    public GatedRateLimiter(int capacityPerCycle) {
        Preconditions.checkArgument(capacityPerCycle > 0, "Capacity per cycle has to be a positive number.");
        this.capacityPerCycle = capacityPerCycle;
        this.capacityLeft = capacityPerCycle;
    }

    @Override
    public CompletionStage<Void> acquire(int numberOfEvents) {
        if (this.gatingFuture == null) {
            this.gatingFuture = CompletableFuture.completedFuture(null);
        }
        if (this.capacityLeft <= 0) {
            this.gatingFuture = new CompletableFuture();
        }
        return this.gatingFuture.thenRun(() -> this.capacityLeft -= numberOfEvents);
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        this.capacityLeft = this.capacityPerCycle;
        this.gatingFuture.complete(null);
    }
}

