/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst.snapshot;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.ForStResourceContainer;
import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer;
import org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy;
import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.forstdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ForStIncrementalSnapshotStrategy<K>
extends ForStNativeFullSnapshotStrategy<K> {
    private static final Logger LOG = LoggerFactory.getLogger(ForStIncrementalSnapshotStrategy.class);
    private static final String DESCRIPTION = "Asynchronous incremental ForSt snapshot";
    @Nonnull
    private final SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> uploadedSstFiles;
    private long lastCompletedCheckpointId;

    public ForStIncrementalSnapshotStrategy(@Nonnull RocksDB db, @Nonnull ResourceGuard forstResourceGuard, @Nonnull ForStResourceContainer resourceContainer, @Nonnull TypeSerializer<K> keySerializer, @Nonnull LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int keyGroupPrefixBytes, @Nonnull UUID backendUID, @Nonnull SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> uploadedStateHandles, @Nonnull ForStStateDataTransfer stateTransfer, long lastCompletedCheckpointId) {
        super(DESCRIPTION, db, forstResourceGuard, resourceContainer, keySerializer, kvStateInformation, keyGroupRange, keyGroupPrefixBytes, backendUID, stateTransfer);
        this.uploadedSstFiles = new TreeMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>>(uploadedStateHandles);
        this.lastCompletedCheckpointId = lastCompletedCheckpointId;
    }

    @Override
    public SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(ForStSnapshotStrategyBase.ForStNativeSnapshotResources snapshotResources, long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        if (snapshotResources.stateMetaInfoSnapshots.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Asynchronous ForSt snapshot performed on empty keyed state at {}. Returning null.", (Object)timestamp);
            }
            return registry -> SnapshotResult.empty();
        }
        SnapshotType.SharingFilesStrategy sharingFilesStrategy = checkpointOptions.getCheckpointType().getSharingFilesStrategy();
        switch (sharingFilesStrategy) {
            case FORWARD_BACKWARD: {
                break;
            }
            case NO_SHARING: {
                snapshotResources.setPreviousSnapshot(EMPTY_PREVIOUS_SNAPSHOT);
                break;
            }
            case FORWARD: {
                if (snapshotResources.previousSnapshot.isEmpty()) break;
                throw new IllegalArgumentException("Triggering a full checkpoint for IncrementalSnapshotStrategy is not supported.");
            }
            default: {
                throw new IllegalArgumentException(String.format("Unsupported sharing files strategy for %s : %s", this.getClass().getName(), sharingFilesStrategy));
            }
        }
        return new ForStIncrementalSnapshotOperation(checkpointId, snapshotResources, checkpointStreamFactory, sharingFilesStrategy);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifyCheckpointComplete(long completedCheckpointId) {
        SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> sortedMap = this.uploadedSstFiles;
        synchronized (sortedMap) {
            LOG.info("Backend:{} checkpoint:{} complete.", (Object)this.backendUID, (Object)completedCheckpointId);
            if (completedCheckpointId > this.lastCompletedCheckpointId && this.uploadedSstFiles.containsKey(completedCheckpointId)) {
                this.uploadedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId);
                this.lastCompletedCheckpointId = completedCheckpointId;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifyCheckpointAborted(long abortedCheckpointId) {
        SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> sortedMap = this.uploadedSstFiles;
        synchronized (sortedMap) {
            LOG.info("Backend:{} checkpoint:{} aborted.", (Object)this.backendUID, (Object)abortedCheckpointId);
            this.uploadedSstFiles.keySet().remove(abortedCheckpointId);
        }
    }

    @Override
    public void close() {
        this.stateTransfer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected ForStSnapshotStrategyBase.PreviousSnapshot snapshotMetaData(long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
        Collection confirmedSstFiles;
        long lastCompletedCheckpoint;
        SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> sortedMap = this.uploadedSstFiles;
        synchronized (sortedMap) {
            lastCompletedCheckpoint = this.lastCompletedCheckpointId;
            confirmedSstFiles = (Collection)this.uploadedSstFiles.get(lastCompletedCheckpoint);
            LOG.trace("Use confirmed SST files for checkpoint {}: {}", (Object)checkpointId, (Object)confirmedSstFiles);
        }
        LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} assuming the following (shared) confirmed files as base: {}.", new Object[]{checkpointId, lastCompletedCheckpoint, confirmedSstFiles});
        for (Map.Entry entry : this.kvStateInformation.entrySet()) {
            stateMetaInfoSnapshots.add(((ForStOperationUtils.ForStKvStateInfo)entry.getValue()).metaInfo.snapshot());
        }
        return new ForStSnapshotStrategyBase.PreviousSnapshot(confirmedSstFiles);
    }

    private final class ForStIncrementalSnapshotOperation
    extends ForStSnapshotStrategyBase.ForStSnapshotOperation {
        @Nonnull
        private final SnapshotType.SharingFilesStrategy sharingFilesStrategy;
        @Nonnull
        private final ForStSnapshotStrategyBase.ForStNativeSnapshotResources snapshotResources;

        private ForStIncrementalSnapshotOperation(@Nonnull long checkpointId, @Nonnull ForStSnapshotStrategyBase.ForStNativeSnapshotResources snapshotResources, @Nonnull CheckpointStreamFactory checkpointStreamFactory, SnapshotType.SharingFilesStrategy sharingFilesStrategy) {
            super(checkpointId, checkpointStreamFactory);
            this.sharingFilesStrategy = sharingFilesStrategy;
            this.snapshotResources = snapshotResources;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception {
            boolean completed = false;
            ArrayList<StreamStateHandle> reusedHandle = new ArrayList<StreamStateHandle>();
            try {
                SnapshotResult<StreamStateHandle> metaStateHandle = ForStIncrementalSnapshotStrategy.this.materializeMetaData(snapshotCloseableRegistry, this.tmpResourcesRegistry, this.snapshotResources.stateMetaInfoSnapshots, this.checkpointId, this.checkpointStreamFactory);
                ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> sstFiles = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>();
                ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> miscFiles = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>();
                long checkpointedSize = metaStateHandle.getStateSize();
                IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(ForStIncrementalSnapshotStrategy.this.backendUID, ForStIncrementalSnapshotStrategy.this.keyGroupRange, this.checkpointId, sstFiles, miscFiles, (StreamStateHandle)metaStateHandle.getJobManagerOwnedSnapshot(), checkpointedSize += this.transferSnapshotFiles(sstFiles, miscFiles, snapshotCloseableRegistry, this.tmpResourcesRegistry, reusedHandle));
                completed = true;
                SnapshotResult snapshotResult = SnapshotResult.of((StateObject)jmIncrementalKeyedStateHandle);
                return snapshotResult;
            }
            finally {
                this.snapshotResources.release();
                if (!completed) {
                    try {
                        this.tmpResourcesRegistry.close();
                    }
                    catch (Exception e) {
                        LOG.warn("Could not properly clean tmp resources.", (Throwable)e);
                    }
                } else {
                    this.checkpointStreamFactory.reusePreviousStateHandle(reusedHandle);
                }
            }
        }

        private long transferSnapshotFiles(@Nonnull List<IncrementalKeyedStateHandle.HandleAndLocalPath> sstHandles, @Nonnull List<IncrementalKeyedStateHandle.HandleAndLocalPath> metaHandles, @Nonnull CloseableRegistry snapshotCloseableRegistry, @Nonnull CloseableRegistry tmpResourcesRegistry, @Nonnull List<StreamStateHandle> reusedHandle) throws Exception {
            Preconditions.checkNotNull(this.snapshotResources.liveFiles, (String)"liveFiles were not properly created.");
            if (this.snapshotResources.liveFiles.isEmpty()) {
                return 0L;
            }
            Tuple4<List<IncrementalKeyedStateHandle.HandleAndLocalPath>, List<Path>, List<Path>, Path> classifiedFiles = this.classifyFiles();
            sstHandles.addAll((Collection)classifiedFiles.f0);
            sstHandles.stream().map(IncrementalKeyedStateHandle.HandleAndLocalPath::getHandle).forEach(reusedHandle::add);
            CheckpointedStateScope stateScope = this.sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING ? CheckpointedStateScope.EXCLUSIVE : CheckpointedStateScope.SHARED;
            long transferBytes = 0L;
            List<IncrementalKeyedStateHandle.HandleAndLocalPath> sstFilesTransferResult = ForStIncrementalSnapshotStrategy.this.stateTransfer.transferFilesToCheckpointFs(this.sharingFilesStrategy, (List)classifiedFiles.f1, this.checkpointStreamFactory, stateScope, snapshotCloseableRegistry, tmpResourcesRegistry, false);
            sstHandles.addAll(sstFilesTransferResult);
            transferBytes += sstFilesTransferResult.stream().mapToLong(IncrementalKeyedStateHandle.HandleAndLocalPath::getStateSize).sum();
            List<IncrementalKeyedStateHandle.HandleAndLocalPath> miscFilesTransferResult = ForStIncrementalSnapshotStrategy.this.stateTransfer.transferFilesToCheckpointFs(this.sharingFilesStrategy, (List)classifiedFiles.f2, this.checkpointStreamFactory, stateScope, snapshotCloseableRegistry, tmpResourcesRegistry, false);
            metaHandles.addAll(miscFilesTransferResult);
            transferBytes += miscFilesTransferResult.stream().mapToLong(IncrementalKeyedStateHandle.HandleAndLocalPath::getStateSize).sum();
            IncrementalKeyedStateHandle.HandleAndLocalPath manifestFileTransferResult = ForStIncrementalSnapshotStrategy.this.stateTransfer.transferFileToCheckpointFs(this.sharingFilesStrategy, (Path)classifiedFiles.f3, this.snapshotResources.manifestFileSize, this.checkpointStreamFactory, stateScope, snapshotCloseableRegistry, tmpResourcesRegistry, false);
            metaHandles.add(manifestFileTransferResult);
            transferBytes += manifestFileTransferResult.getStateSize();
            IncrementalKeyedStateHandle.HandleAndLocalPath currentFileWriteResult = ForStIncrementalSnapshotStrategy.this.stateTransfer.writeFileToCheckpointFs("CURRENT", this.snapshotResources.getCurrentFileContent(), this.checkpointStreamFactory, stateScope, snapshotCloseableRegistry, tmpResourcesRegistry);
            metaHandles.add(currentFileWriteResult);
            this.recordReusableHandles(sstHandles);
            return transferBytes += currentFileWriteResult.getStateSize();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void recordReusableHandles(List<IncrementalKeyedStateHandle.HandleAndLocalPath> sstHandles) {
            SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> sortedMap = ForStIncrementalSnapshotStrategy.this.uploadedSstFiles;
            synchronized (sortedMap) {
                switch (this.sharingFilesStrategy) {
                    case FORWARD_BACKWARD: 
                    case FORWARD: {
                        ForStIncrementalSnapshotStrategy.this.uploadedSstFiles.put(this.checkpointId, Collections.unmodifiableList(sstHandles));
                        break;
                    }
                    case NO_SHARING: {
                        break;
                    }
                    default: {
                        throw new IllegalArgumentException("Unsupported sharing files strategy: " + String.valueOf(this.sharingFilesStrategy));
                    }
                }
            }
        }

        private Tuple4<List<IncrementalKeyedStateHandle.HandleAndLocalPath>, List<Path>, List<Path>, Path> classifyFiles() {
            int totalFileNum = this.snapshotResources.liveFiles.size();
            ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> transferredSstHandles = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>(totalFileNum);
            ArrayList<Path> toTransferSstFiles = new ArrayList<Path>(totalFileNum);
            ArrayList<Path> toTransferMiscFiles = new ArrayList<Path>(totalFileNum);
            Path toTransferManifestFile = null;
            for (Path filePath : this.snapshotResources.liveFiles) {
                String fileName = filePath.getName();
                if (fileName.equals(this.snapshotResources.manifestFileName)) {
                    toTransferManifestFile = filePath;
                    continue;
                }
                if (fileName.endsWith(".sst")) {
                    Optional<StreamStateHandle> uploaded = this.snapshotResources.previousSnapshot.getUploaded(fileName);
                    if (uploaded.isPresent() && this.checkpointStreamFactory.couldReuseStateHandle(uploaded.get())) {
                        transferredSstHandles.add(IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)uploaded.get(), (String)fileName));
                        continue;
                    }
                    toTransferSstFiles.add(filePath);
                    continue;
                }
                toTransferMiscFiles.add(filePath);
            }
            return Tuple4.of(transferredSstHandles, toTransferSstFiles, toTransferMiscFiles, toTransferManifestFile);
        }
    }
}

