package org.apache.flink.table.connector.source.lookup.cache.trigger;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
import org.apache.flink.util.Preconditions;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/table/connector/source/lookup/cache/trigger/PeriodicCacheReloadTrigger.class */
public class PeriodicCacheReloadTrigger implements CacheReloadTrigger {
    private static final long serialVersionUID = 1;
    private final Duration reloadInterval;
    private final ScheduleMode scheduleMode;
    private transient ScheduledExecutorService scheduledExecutor;

    @PublicEvolving
    /* loaded from: input_file:org/apache/flink/table/connector/source/lookup/cache/trigger/PeriodicCacheReloadTrigger$ScheduleMode.class */
    public enum ScheduleMode {
        FIXED_DELAY,
        FIXED_RATE
    }

    public PeriodicCacheReloadTrigger(Duration duration, ScheduleMode scheduleMode) {
        Preconditions.checkArgument((duration.isNegative() || duration.isZero()) ? false : true, "Reload interval must be greater than zero.");
        this.reloadInterval = duration;
        this.scheduleMode = scheduleMode;
    }

    @VisibleForTesting
    PeriodicCacheReloadTrigger(Duration duration, ScheduleMode scheduleMode, ScheduledExecutorService scheduledExecutorService) {
        this(duration, scheduleMode);
        this.scheduledExecutor = scheduledExecutorService;
    }

    @Override // org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger
    public void open(CacheReloadTrigger.Context context) {
        if (this.scheduledExecutor == null) {
            this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        }
        switch (this.scheduleMode) {
            case FIXED_RATE:
                ScheduledExecutorService scheduledExecutorService = this.scheduledExecutor;
                context.getClass();
                scheduledExecutorService.scheduleAtFixedRate(context::triggerReload, 0L, this.reloadInterval.toMillis(), TimeUnit.MILLISECONDS);
                return;
            case FIXED_DELAY:
                this.scheduledExecutor.scheduleWithFixedDelay(() -> {
                    try {
                        context.triggerReload().get();
                    } catch (Exception e) {
                        throw new RuntimeException("Uncaught exception during the reload", e);
                    }
                }, 0L, this.reloadInterval.toMillis(), TimeUnit.MILLISECONDS);
                return;
            default:
                throw new IllegalArgumentException(String.format("Unrecognized schedule mode \"%s\"", this.scheduleMode));
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.scheduledExecutor != null) {
            this.scheduledExecutor.shutdownNow();
        }
    }

    public static PeriodicCacheReloadTrigger fromConfig(ReadableConfig readableConfig) {
        Preconditions.checkArgument(readableConfig.get(LookupOptions.CACHE_TYPE) == LookupOptions.LookupCacheType.FULL, "'%s' should be '%s' in order to build a Periodic cache reload trigger.", new Object[]{LookupOptions.CACHE_TYPE.key(), LookupOptions.LookupCacheType.FULL});
        Preconditions.checkArgument(readableConfig.get(LookupOptions.FULL_CACHE_RELOAD_STRATEGY) == LookupOptions.ReloadStrategy.PERIODIC, "'%s' should be '%s' in order to build a Periodic cache reload trigger.", new Object[]{LookupOptions.FULL_CACHE_RELOAD_STRATEGY.key(), LookupOptions.ReloadStrategy.PERIODIC});
        Preconditions.checkArgument(readableConfig.getOptional(LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL).isPresent(), "Missing '%s' in the configuration. This option is required to build Periodic cache reload trigger.", new Object[]{LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL.key()});
        return new PeriodicCacheReloadTrigger((Duration) readableConfig.get(LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL), (ScheduleMode) readableConfig.get(LookupOptions.FULL_CACHE_PERIODIC_RELOAD_SCHEDULE_MODE));
    }
}
