package org.apache.flink.changelog.fs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.changelog.fs.RetryingExecutor;
import org.apache.flink.changelog.fs.StateChangeUploadScheduler;
import org.apache.flink.changelog.fs.StateChangeUploader;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@ThreadSafe
/* loaded from: input_file:org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.class */
public class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(BatchingStateChangeUploadScheduler.class);
    private final RetryingExecutor retryingExecutor;
    private final RetryPolicy retryPolicy;
    private final StateChangeUploader delegate;
    private final ScheduledExecutorService scheduler;
    private final long scheduleDelayMs;
    private final long sizeThresholdBytes;
    private final Object lock;

    @GuardedBy("lock")
    private final Queue<StateChangeUploadScheduler.UploadTask> scheduled;

    @GuardedBy("lock")
    private long scheduledBytesCounter;
    private final AvailabilityProvider.AvailabilityHelper availabilityHelper;

    @GuardedBy("lock")
    @Nullable
    private ScheduledFuture<?> scheduledFuture;

    @GuardedBy("this")
    @Nullable
    private Throwable errorUnsafe;

    @GuardedBy("lock")
    private final UploadThrottle uploadThrottle;
    private final Histogram uploadBatchSizes;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchingStateChangeUploadScheduler(long j, long j2, RetryPolicy retryPolicy, StateChangeUploader stateChangeUploader, int i, long j3, ChangelogStorageMetricGroup changelogStorageMetricGroup) {
        this(j, j2, j3, retryPolicy, stateChangeUploader, SchedulerFactory.create(1, "ChangelogUploadScheduler", LOG), new RetryingExecutor(i, changelogStorageMetricGroup.getAttemptsPerUpload(), changelogStorageMetricGroup.getTotalAttemptsPerUpload()), changelogStorageMetricGroup);
    }

    BatchingStateChangeUploadScheduler(long j, long j2, long j3, RetryPolicy retryPolicy, StateChangeUploader stateChangeUploader, ScheduledExecutorService scheduledExecutorService, RetryingExecutor retryingExecutor, ChangelogStorageMetricGroup changelogStorageMetricGroup) {
        this.lock = new Object();
        Preconditions.checkArgument(j2 <= j3, "sizeThresholdBytes (%s) must not exceed maxBytesInFlight (%s)", new Object[]{Long.valueOf(j2), Long.valueOf(j3)});
        this.scheduleDelayMs = j;
        this.scheduled = new LinkedList();
        this.scheduler = scheduledExecutorService;
        this.retryPolicy = retryPolicy;
        this.retryingExecutor = retryingExecutor;
        this.sizeThresholdBytes = j2;
        this.delegate = stateChangeUploader;
        this.uploadThrottle = new UploadThrottle(j3);
        this.availabilityHelper = new AvailabilityProvider.AvailabilityHelper();
        this.availabilityHelper.resetAvailable();
        this.uploadBatchSizes = changelogStorageMetricGroup.getUploadBatchSizes();
        changelogStorageMetricGroup.registerUploadQueueSizeGauge(() -> {
            Integer valueOf;
            synchronized (this.scheduled) {
                valueOf = Integer.valueOf(this.scheduled.size());
            }
            return valueOf;
        });
    }

    @Override // org.apache.flink.changelog.fs.StateChangeUploadScheduler
    public void upload(StateChangeUploadScheduler.UploadTask uploadTask) throws IOException {
        Throwable errorSafe = getErrorSafe();
        if (errorSafe != null) {
            LOG.debug("don't persist {} changesets, already failed", Integer.valueOf(uploadTask.changeSets.size()));
            uploadTask.fail(errorSafe);
            return;
        }
        LOG.debug("persist {} changeSets", Integer.valueOf(uploadTask.changeSets.size()));
        try {
            long size = uploadTask.getSize();
            synchronized (this.lock) {
                while (!this.uploadThrottle.hasCapacity()) {
                    this.lock.wait();
                }
                this.uploadThrottle.seizeCapacity(size);
                if (!this.uploadThrottle.hasCapacity()) {
                    this.availabilityHelper.resetUnavailable();
                }
                this.scheduledBytesCounter += size;
                this.scheduled.add(wrapWithSizeUpdate(uploadTask, size));
                scheduleUploadIfNeeded();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            uploadTask.fail(e);
            throw new IOException(e);
        } catch (Exception e2) {
            uploadTask.fail(e2);
            throw e2;
        }
    }

    private void releaseCapacity(long j) {
        CompletableFuture completableFuture = null;
        synchronized (this.lock) {
            boolean hasCapacity = this.uploadThrottle.hasCapacity();
            this.uploadThrottle.releaseCapacity(j);
            this.lock.notifyAll();
            if (!hasCapacity && this.uploadThrottle.hasCapacity()) {
                completableFuture = this.availabilityHelper.getUnavailableToResetAvailable();
            }
        }
        if (completableFuture != null) {
            completableFuture.complete(null);
        }
    }

    private void scheduleUploadIfNeeded() {
        Preconditions.checkState(Thread.holdsLock(this.lock));
        if (this.scheduleDelayMs != 0 && this.scheduledBytesCounter < this.sizeThresholdBytes) {
            if (this.scheduledFuture == null) {
                this.scheduledFuture = this.scheduler.schedule(this::drainAndSave, this.scheduleDelayMs, TimeUnit.MILLISECONDS);
            }
        } else {
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduledFuture = null;
            }
            drainAndSave();
        }
    }

    private void drainAndSave() {
        ArrayList arrayList;
        synchronized (this.lock) {
            arrayList = new ArrayList(this.scheduled);
            this.scheduled.clear();
            this.scheduledBytesCounter = 0L;
            this.scheduledFuture = null;
        }
        if (arrayList.size() == 0) {
            return;
        }
        try {
            Throwable errorSafe = getErrorSafe();
            if (errorSafe != null) {
                arrayList.forEach(uploadTask -> {
                    uploadTask.fail(errorSafe);
                });
            } else {
                this.uploadBatchSizes.update(arrayList.size());
                this.retryingExecutor.execute(this.retryPolicy, asRetriableAction(arrayList));
            }
        } catch (Throwable th) {
            arrayList.forEach(uploadTask2 -> {
                uploadTask2.fail(th);
            });
            if (ExceptionUtils.findThrowable(th, IOException.class).isPresent()) {
                LOG.warn("Caught IO exception while uploading", th);
            } else {
                setErrorSafe(th);
                throw th;
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        ArrayList arrayList;
        LOG.debug("close");
        this.scheduler.shutdownNow();
        if (!this.scheduler.awaitTermination(5L, TimeUnit.SECONDS)) {
            LOG.warn("Unable to cleanly shutdown scheduler in 5s");
        }
        synchronized (this.lock) {
            arrayList = new ArrayList(this.scheduled);
            this.scheduled.clear();
            this.scheduledBytesCounter = 0L;
        }
        CancellationException cancellationException = new CancellationException();
        arrayList.forEach(uploadTask -> {
            uploadTask.fail(cancellationException);
        });
        this.retryingExecutor.close();
        this.delegate.close();
    }

    private synchronized Throwable getErrorSafe() {
        return this.errorUnsafe;
    }

    private synchronized void setErrorSafe(Throwable th) {
        this.errorUnsafe = th;
    }

    private StateChangeUploadScheduler.UploadTask wrapWithSizeUpdate(StateChangeUploadScheduler.UploadTask uploadTask, long j) {
        return new StateChangeUploadScheduler.UploadTask(uploadTask.changeSets, list -> {
            try {
                releaseCapacity(j);
                uploadTask.complete(list);
            } catch (Throwable th) {
                uploadTask.complete(list);
                throw th;
            }
        }, (list2, th) -> {
            try {
                releaseCapacity(j);
                uploadTask.fail(th);
            } catch (Throwable th) {
                uploadTask.fail(th);
                throw th;
            }
        });
    }

    @Override // org.apache.flink.changelog.fs.StateChangeUploadScheduler
    public AvailabilityProvider getAvailabilityProvider() {
        return this.availabilityHelper;
    }

    private RetryingExecutor.RetriableAction<StateChangeUploader.UploadTasksResult> asRetriableAction(final Collection<StateChangeUploadScheduler.UploadTask> collection) {
        return new RetryingExecutor.RetriableAction<StateChangeUploader.UploadTasksResult>() { // from class: org.apache.flink.changelog.fs.BatchingStateChangeUploadScheduler.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.flink.changelog.fs.RetryingExecutor.RetriableAction
            public StateChangeUploader.UploadTasksResult tryExecute() throws Exception {
                return BatchingStateChangeUploadScheduler.this.delegate.upload(collection);
            }

            @Override // org.apache.flink.changelog.fs.RetryingExecutor.RetriableAction
            public void completeWithResult(StateChangeUploader.UploadTasksResult uploadTasksResult) {
                uploadTasksResult.complete();
            }

            @Override // org.apache.flink.changelog.fs.RetryingExecutor.RetriableAction
            public void discardResult(StateChangeUploader.UploadTasksResult uploadTasksResult) throws Exception {
                uploadTasksResult.discard();
            }

            @Override // org.apache.flink.changelog.fs.RetryingExecutor.RetriableAction
            public void handleFailure(Throwable th) {
                collection.forEach(uploadTask -> {
                    uploadTask.fail(th);
                });
            }
        };
    }
}
