/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state.snapshot;

import java.util.LinkedHashMap;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.snapshot.RocksDBFullSnapshotResources;
import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.FullSnapshotAsyncWriter;
import org.apache.flink.runtime.state.FullSnapshotResources;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.function.SupplierWithException;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksFullSnapshotStrategy<K>
extends RocksDBSnapshotStrategyBase<K, FullSnapshotResources<K>> {
    private static final Logger LOG = LoggerFactory.getLogger(RocksFullSnapshotStrategy.class);
    private static final String DESCRIPTION = "Asynchronous full RocksDB snapshot";
    @Nonnull
    private final StreamCompressionDecorator keyGroupCompressionDecorator;
    private final LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;

    public RocksFullSnapshotStrategy(@Nonnull RocksDB db, @Nonnull ResourceGuard rocksDBResourceGuard, @Nonnull TypeSerializer<K> keySerializer, @Nonnull LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, @Nonnull LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int keyGroupPrefixBytes, @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull StreamCompressionDecorator keyGroupCompressionDecorator) {
        super(DESCRIPTION, db, rocksDBResourceGuard, keySerializer, kvStateInformation, keyGroupRange, keyGroupPrefixBytes, localRecoveryConfig);
        this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
        this.registeredPQStates = registeredPQStates;
    }

    public FullSnapshotResources<K> syncPrepareResources(long checkpointId) throws Exception {
        return RocksDBFullSnapshotResources.create(this.kvStateInformation, this.registeredPQStates, this.db, this.rocksDBResourceGuard, this.keyGroupRange, this.keySerializer, this.keyGroupPrefixBytes, this.keyGroupCompressionDecorator);
    }

    public SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(FullSnapshotResources<K> fullRocksDBSnapshotResources, long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        if (fullRocksDBSnapshotResources.getMetaInfoSnapshots().isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", (Object)timestamp);
            }
            return registry -> SnapshotResult.empty();
        }
        SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier = this.createCheckpointStreamSupplier(checkpointId, checkpointStreamFactory, checkpointOptions);
        return new FullSnapshotAsyncWriter(checkpointOptions.getCheckpointType(), checkpointStreamSupplier, fullRocksDBSnapshotResources);
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }

    public void notifyCheckpointAborted(long checkpointId) {
    }

    @Override
    public void close() {
    }

    private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier(long checkpointId, CheckpointStreamFactory primaryStreamFactory, CheckpointOptions checkpointOptions) {
        return this.localRecoveryConfig.isLocalRecoveryEnabled() && !checkpointOptions.getCheckpointType().isSavepoint() ? () -> CheckpointStreamWithResultProvider.createDuplicatingStream((long)checkpointId, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryStreamFactory, (LocalRecoveryDirectoryProvider)this.localRecoveryConfig.getLocalStateDirectoryProvider()) : () -> CheckpointStreamWithResultProvider.createSimpleStream((CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryStreamFactory);
    }
}

