/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.sink;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexPathFactory;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.CompactedChangelogPathResolver;
import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.FileOperationThreadPool;
import org.apache.paimon.utils.IndexFilePathFactories;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableCommitImpl
implements InnerTableCommit {
    private static final Logger LOG = LoggerFactory.getLogger(TableCommitImpl.class);
    private final FileStoreCommit commit;
    @Nullable
    private final Runnable expireSnapshots;
    @Nullable
    private final PartitionExpire partitionExpire;
    @Nullable
    private final TagAutoManager tagAutoManager;
    @Nullable
    private final Duration consumerExpireTime;
    private final ConsumerManager consumerManager;
    private final ExecutorService maintainExecutor;
    private final AtomicReference<Throwable> maintainError;
    private final String tableName;
    private final boolean forceCreatingSnapshot;
    private final ThreadPoolExecutor fileCheckExecutor;
    @Nullable
    private Map<String, String> overwritePartition = null;
    private boolean batchCommitted = false;
    private boolean expireForEmptyCommit = true;

    public TableCommitImpl(FileStoreCommit commit, @Nullable Runnable expireSnapshots, @Nullable PartitionExpire partitionExpire, @Nullable TagAutoManager tagAutoManager, @Nullable Duration consumerExpireTime, ConsumerManager consumerManager, CoreOptions.ExpireExecutionMode expireExecutionMode, String tableName, boolean forceCreatingSnapshot, int threadNum) {
        if (partitionExpire != null) {
            commit.withPartitionExpire(partitionExpire);
        }
        this.commit = commit;
        this.expireSnapshots = expireSnapshots;
        this.partitionExpire = partitionExpire;
        this.tagAutoManager = tagAutoManager;
        this.consumerExpireTime = consumerExpireTime;
        this.consumerManager = consumerManager;
        this.maintainExecutor = expireExecutionMode == CoreOptions.ExpireExecutionMode.SYNC ? MoreExecutors.newDirectExecutorService() : Executors.newSingleThreadExecutor(new ExecutorThreadFactory(Thread.currentThread().getName() + "expire-main-thread"));
        this.maintainError = new AtomicReference<Object>(null);
        this.tableName = tableName;
        this.forceCreatingSnapshot = forceCreatingSnapshot;
        this.fileCheckExecutor = FileOperationThreadPool.getExecutorService(threadNum);
    }

    public boolean forceCreatingSnapshot() {
        if (this.forceCreatingSnapshot) {
            return true;
        }
        if (this.overwritePartition != null) {
            return true;
        }
        return this.tagAutoManager != null && this.tagAutoManager.getTagAutoCreation() != null && this.tagAutoManager.getTagAutoCreation().forceCreatingSnapshot();
    }

    @Override
    public TableCommitImpl withOverwrite(@Nullable Map<String, String> overwritePartitions) {
        this.overwritePartition = overwritePartitions;
        return this;
    }

    @Override
    public TableCommitImpl ignoreEmptyCommit(boolean ignoreEmptyCommit) {
        this.commit.ignoreEmptyCommit(ignoreEmptyCommit);
        return this;
    }

    @Override
    public TableCommitImpl expireForEmptyCommit(boolean expireForEmptyCommit) {
        this.expireForEmptyCommit = expireForEmptyCommit;
        return this;
    }

    @Override
    public InnerTableCommit withMetricRegistry(MetricRegistry registry) {
        this.commit.withMetrics(new CommitMetrics(registry, this.tableName));
        return this;
    }

    @Override
    public void commit(List<CommitMessage> commitMessages) {
        this.checkCommitted();
        this.commit(Long.MAX_VALUE, commitMessages);
    }

    @Override
    public void truncateTable() {
        this.checkCommitted();
        this.commit.truncateTable(Long.MAX_VALUE);
    }

    @Override
    public void truncatePartitions(List<Map<String, String>> partitionSpecs) {
        this.commit.dropPartitions(partitionSpecs, Long.MAX_VALUE);
    }

    @Override
    public void updateStatistics(Statistics statistics) {
        this.commit.commitStatistics(statistics, Long.MAX_VALUE);
    }

    @Override
    public void compactManifests() {
        this.commit.compactManifest();
    }

    private void checkCommitted() {
        Preconditions.checkState(!this.batchCommitted, "BatchTableCommit only support one-time committing.");
        this.batchCommitted = true;
    }

    @Override
    public void commit(long identifier, List<CommitMessage> commitMessages) {
        this.commit(this.createManifestCommittable(identifier, commitMessages));
    }

    @Override
    public int filterAndCommit(Map<Long, List<CommitMessage>> commitIdentifiersAndMessages) {
        return this.filterAndCommitMultiple(commitIdentifiersAndMessages.entrySet().stream().map(e -> this.createManifestCommittable((Long)e.getKey(), (List)e.getValue())).collect(Collectors.toList()));
    }

    private ManifestCommittable createManifestCommittable(long identifier, List<CommitMessage> commitMessages) {
        ManifestCommittable committable = new ManifestCommittable(identifier);
        for (CommitMessage commitMessage : commitMessages) {
            committable.addFileCommittable(commitMessage);
        }
        return committable;
    }

    public void commit(ManifestCommittable committable) {
        this.commitMultiple(Collections.singletonList(committable), false);
    }

    public void commitMultiple(List<ManifestCommittable> committables, boolean checkAppendFiles) {
        if (this.overwritePartition == null) {
            int newSnapshots = 0;
            for (ManifestCommittable committable : committables) {
                newSnapshots += this.commit.commit(committable, checkAppendFiles);
            }
            if (!committables.isEmpty()) {
                this.maintain(committables.get(committables.size() - 1).identifier(), this.maintainExecutor, newSnapshots > 0 || this.expireForEmptyCommit);
            }
        } else {
            if (committables.size() > 1) {
                throw new RuntimeException("Multiple committables appear in overwrite mode, this may be a bug, please report it: " + committables);
            }
            ManifestCommittable committable = committables.size() == 1 ? committables.get(0) : new ManifestCommittable(Long.MAX_VALUE);
            int newSnapshots = this.commit.overwritePartition(this.overwritePartition, committable, Collections.emptyMap());
            this.maintain(committable.identifier(), this.maintainExecutor, newSnapshots > 0 || this.expireForEmptyCommit);
        }
    }

    public int filterAndCommitMultiple(List<ManifestCommittable> committables) {
        return this.filterAndCommitMultiple(committables, true);
    }

    public int filterAndCommitMultiple(List<ManifestCommittable> committables, boolean checkAppendFiles) {
        List<ManifestCommittable> sortedCommittables = committables.stream().sorted(Comparator.comparingLong(ManifestCommittable::identifier)).collect(Collectors.toList());
        List<ManifestCommittable> retryCommittables = this.commit.filterCommitted(sortedCommittables);
        if (!retryCommittables.isEmpty()) {
            this.checkFilesExistence(retryCommittables);
            this.commitMultiple(retryCommittables, checkAppendFiles);
        }
        return retryCommittables.size();
    }

    private void checkFilesExistence(List<ManifestCommittable> committables) {
        ArrayList files = new ArrayList();
        DataFilePathFactories factories = new DataFilePathFactories(this.commit.pathFactory());
        IndexFilePathFactories indexFactories = new IndexFilePathFactories(this.commit.pathFactory());
        for (ManifestCommittable manifestCommittable : committables) {
            for (CommitMessage commitMessage : manifestCommittable.fileCommittables()) {
                CommitMessageImpl msg = (CommitMessageImpl)commitMessage;
                DataFilePathFactory pathFactory = factories.get(commitMessage.partition(), commitMessage.bucket());
                IndexPathFactory indexFileFactory = indexFactories.get(commitMessage.partition(), commitMessage.bucket());
                Consumer<DataFileMeta> collector = f -> files.addAll(f.collectFiles(pathFactory));
                msg.newFilesIncrement().newFiles().forEach(collector);
                msg.newFilesIncrement().changelogFiles().forEach(collector);
                msg.newFilesIncrement().newIndexFiles().stream().map(indexFileFactory::toPath).forEach(files::add);
                msg.compactIncrement().compactAfter().forEach(collector);
                msg.compactIncrement().newIndexFiles().stream().map(indexFileFactory::toPath).forEach(files::add);
            }
        }
        List<Path> resolvedFiles = new ArrayList<Path>();
        for (Path file : files) {
            resolvedFiles.add(CompactedChangelogPathResolver.resolveCompactedChangelogPath(file));
        }
        Predicate<Path> predicate = p -> {
            try {
                return !this.commit.fileIO().exists((Path)p);
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        };
        ArrayList nonExistFiles = Lists.newArrayList(ThreadPoolUtils.randomlyExecuteSequentialReturn(this.fileCheckExecutor, f -> nonExists.test((Path)f) ? Collections.singletonList(f) : Collections.emptyList(), resolvedFiles = resolvedFiles.stream().distinct().collect(Collectors.toList())));
        if (!nonExistFiles.isEmpty()) {
            String string = String.join((CharSequence)"\n", "Cannot recover from this checkpoint because some files in the snapshot that need to be resubmitted have been deleted:", "    " + nonExistFiles.stream().map(Object::toString).collect(Collectors.joining(",")), "    The most likely reason is because you are recovering from a very old savepoint that contains some uncommitted files that have already been deleted.");
            throw new RuntimeException(string);
        }
    }

    private void maintain(long identifier, ExecutorService executor, boolean doExpire) {
        if (this.maintainError.get() != null) {
            throw new RuntimeException(this.maintainError.get());
        }
        executor.execute(() -> {
            try {
                this.maintain(identifier, doExpire);
            }
            catch (Throwable t) {
                LOG.error("Executing maintain encountered an error.", t);
                this.maintainError.compareAndSet(null, t);
            }
        });
    }

    private void maintain(long identifier, boolean doExpire) {
        if (doExpire && this.consumerExpireTime != null) {
            this.consumerManager.expire(LocalDateTime.now().minus(this.consumerExpireTime));
        }
        if (doExpire && this.expireSnapshots != null) {
            this.expireSnapshots.run();
        }
        if (doExpire && this.partitionExpire != null) {
            this.partitionExpire.expire(identifier);
        }
        if (this.tagAutoManager != null) {
            TagAutoCreation tagAutoCreation = this.tagAutoManager.getTagAutoCreation();
            if (tagAutoCreation != null) {
                tagAutoCreation.run();
            }
            TagTimeExpire tagTimeExpire = this.tagAutoManager.getTagTimeExpire();
            if (doExpire && tagTimeExpire != null) {
                tagTimeExpire.expire();
            }
        }
    }

    public void expireSnapshots() {
        if (this.expireSnapshots != null) {
            this.expireSnapshots.run();
        }
    }

    @Override
    public void close() throws Exception {
        this.commit.close();
        this.maintainExecutor.shutdownNow();
    }

    @Override
    public void abort(List<CommitMessage> commitMessages) {
        this.commit.abort(commitMessages);
    }

    @VisibleForTesting
    public ExecutorService getMaintainExecutor() {
        return this.maintainExecutor;
    }
}

