/*
 * Decompiled with CFR 0.152.
 */
package com.atlassian.stash.internal.idx.impl;

import com.atlassian.plugin.ModuleDescriptor;
import com.atlassian.plugin.event.PluginEventListener;
import com.atlassian.plugin.event.events.PluginDisabledEvent;
import com.atlassian.plugin.event.events.PluginEnabledEvent;
import com.atlassian.stash.content.Changeset;
import com.atlassian.stash.content.ChangesetCallback;
import com.atlassian.stash.exception.ChangesetIndexingException;
import com.atlassian.stash.i18n.I18nService;
import com.atlassian.stash.i18n.KeyedMessage;
import com.atlassian.stash.idx.ChangesetIndex;
import com.atlassian.stash.idx.ChangesetIndexer;
import com.atlassian.stash.idx.IndexingContext;
import com.atlassian.stash.internal.annotation.Unsecured;
import com.atlassian.stash.internal.idx.ChangesetIndexerStateDao;
import com.atlassian.stash.internal.idx.ChangesetIndexingService;
import com.atlassian.stash.internal.idx.RepositorySnapshotService;
import com.atlassian.stash.internal.idx.impl.BlockingChangesetQueueCallback;
import com.atlassian.stash.internal.idx.impl.DefaultIndexingContext;
import com.atlassian.stash.internal.scm.ScmClientProvider;
import com.atlassian.stash.repository.Repository;
import com.atlassian.stash.repository.RepositoryService;
import com.atlassian.stash.scm.AsyncCommand;
import com.atlassian.stash.scm.ScmClient;
import com.atlassian.stash.user.Permission;
import com.atlassian.stash.user.SecurityService;
import com.atlassian.stash.util.Operation;
import com.atlassian.util.profiling.UtilTimerStack;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;

@Service
public class ChangesetIndexingServiceImpl
implements ChangesetIndexingService {
    private static final Logger log = LoggerFactory.getLogger(ChangesetIndexingServiceImpl.class);
    private static final String MANUALLY_REGISTERED = "manual-indexers";
    private final Multimap<String, ChangesetIndexer> indexers = HashMultimap.create((int)2, (int)1);
    private final ChangesetIndex changesetIndex;
    private final ChangesetIndexerStateDao indexerStateDao;
    private final PlatformTransactionManager transactionManager;
    private final ScmClientProvider clientProvider;
    private final RepositorySnapshotService snapshotService;
    private final SecurityService securityService;
    private final RepositoryService repositoryService;
    private final I18nService i18nService;
    private final Set<Repository> runningIndexingJobs;
    private boolean active = true;
    @Value(value="${indexing.job.batch.size}")
    private int indexBatchSize;
    @Value(value="${indexing.process.timeout.execution}")
    private long processExecutionTimeoutSeconds;

    @Autowired
    public ChangesetIndexingServiceImpl(ChangesetIndex changesetIndex, ChangesetIndexerStateDao indexerStateDao, ScmClientProvider clientProvider, RepositorySnapshotService snapshotService, SecurityService securityService, RepositoryService repositoryService, PlatformTransactionManager transactionManager, I18nService i18nService) {
        this.changesetIndex = changesetIndex;
        this.indexerStateDao = indexerStateDao;
        this.clientProvider = clientProvider;
        this.snapshotService = snapshotService;
        this.securityService = securityService;
        this.repositoryService = repositoryService;
        this.transactionManager = transactionManager;
        this.i18nService = i18nService;
        this.runningIndexingJobs = Collections.synchronizedSet(Sets.newHashSet());
    }

    @PreDestroy
    public void onDestroy() {
        this.active = false;
        log.debug("IndexingService is shutting down. Running indexing jobs will be aborted");
    }

    @PostConstruct
    public void onCreate() {
        this.active = true;
    }

    @PluginEventListener
    public synchronized void onPluginEnabled(PluginEnabledEvent event) {
        for (ModuleDescriptor module : event.getPlugin().getModuleDescriptorsByModuleClass(ChangesetIndexer.class)) {
            ChangesetIndexer indexer = (ChangesetIndexer)module.getModule();
            if (indexer != null && indexer.getId() == null) {
                log.warn("Ignoring indexer from " + event.getPlugin().getKey() + " because it does not define an id");
                continue;
            }
            this.indexers.put((Object)event.getPlugin().getKey(), module.getModule());
        }
    }

    @PluginEventListener
    public synchronized void onPluginDisabled(PluginDisabledEvent event) {
        this.indexers.removeAll((Object)event.getPlugin().getKey());
    }

    public void register(ChangesetIndexer indexer) {
        if (indexer.getId() == null) {
            throw new IllegalStateException("Ignoring indexer because it does not define an id");
        }
        this.indexers.put((Object)MANUALLY_REGISTERED, (Object)indexer);
    }

    public void unregister(ChangesetIndexer indexer) {
        this.indexers.remove((Object)MANUALLY_REGISTERED, (Object)indexer);
    }

    public Collection<ChangesetIndexer> getChangesetIndexers() {
        return Collections.unmodifiableCollection(this.indexers.values());
    }

    @Override
    @Unsecured(value="anyone can ask for the status")
    public boolean isActive() {
        return this.active;
    }

    @Override
    @PreAuthorize(value="hasGlobalPermission('ADMIN')")
    public void setIndexBatchSize(int indexBatchSize) {
        this.indexBatchSize = indexBatchSize;
    }

    @Override
    @Unsecured(value="anyone can fire an index - it doesn't leak information")
    public void indexRepository(final Repository repository) {
        log.debug("Request for indexing of " + repository.getSlug() + " received");
        this.securityService.doWithPermission("changeset-induexing", Permission.REPO_READ, (Operation)new Operation<Void, RuntimeException>(){

            public Void perform() throws RuntimeException {
                ChangesetIndexingServiceImpl.this.internalIndexRepository(repository);
                return null;
            }
        });
    }

    private long getExecutionTimeoutTimestampFromNow() {
        return System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(this.processExecutionTimeoutSeconds);
    }

    private boolean isProcessExecutionTimeoutConfigured() {
        return this.processExecutionTimeoutSeconds > 0L;
    }

    private void handleProcessTimeout(String repoIdentifier, Future<Void> future, long timeoutTimestamp) {
        long now = System.currentTimeMillis();
        if (this.isProcessExecutionTimeoutConfigured() && now > timeoutTimestamp) {
            log.info(repoIdentifier + "Indexing scm process has timed out, wrapping up the process.");
            try {
                future.get(10L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
            }
            catch (ExecutionException e) {
            }
            catch (TimeoutException e) {
                KeyedMessage message = this.i18nService.getKeyedText("stash.service.changesetindexing.timeout", "A scm process has timed out during the indexing of {0} (timeout = {1}s)", new Object[]{repoIdentifier, this.processExecutionTimeoutSeconds});
                throw new ChangesetIndexingException(message);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void internalIndexRepository(Repository repository) {
        String repoIdentifier = "[" + repository.getProject().getKey() + "/" + repository.getSlug() + "] ";
        if (this.runningIndexingJobs.contains(repository)) {
            log.info(repoIdentifier + " is already indexing, ignoring second indexing request");
            return;
        }
        if (!this.active) {
            log.info("Skipping indexing of " + repoIdentifier + "; indexer has been shut down.");
        }
        log.debug(repoIdentifier + "Starting indexing");
        List<ChangesetIndexer> indexers = this.getEnabledIndexers(repository);
        if (indexers.isEmpty()) {
            return;
        }
        int changesetCount = 0;
        TransactionStatus tx = this.beginTx(repository, changesetCount);
        IndexingContext ctx = this.createContext(repository);
        Repository repo = this.repositoryService.findRepositoryById(repository.getId());
        try {
            this.notifyOnBefore(repoIdentifier, indexers, ctx);
            Date lastIndexingRunDate = this.indexerStateDao.getOldestLastRunDate(repo, indexers);
            Date thisIndexingRunDate = new Date();
            if (lastIndexingRunDate == null && this.isEmpty(repo)) {
                log.debug(repoIdentifier + " skipping indexing of empty repository");
                this.transactionManager.commit(tx);
                return;
            }
            BlockingChangesetQueueCallback changesetQueue = new BlockingChangesetQueueCallback(1024);
            AsyncCommand<Void> command = this.getChangesetsAddedSinceLastRunCommand(repo, lastIndexingRunDate, thisIndexingRunDate, changesetQueue);
            Future result = command.start();
            long timeoutTimestamp = this.getExecutionTimeoutTimestampFromNow();
            this.logStartBatch(repoIdentifier, changesetCount);
            try {
                Changeset changeset;
                while (this.active && (changeset = changesetQueue.poll(5L, TimeUnit.SECONDS)) != null) {
                    this.handleProcessTimeout(repoIdentifier, result, timeoutTimestamp);
                    this.changesetIndex.addChangeset(changeset, repo);
                    for (ChangesetIndexer indexer : indexers) {
                        indexer.onChangesetAdded(changeset, ctx);
                    }
                    if (++changesetCount % this.indexBatchSize == 0) {
                        this.transactionManager.commit(tx);
                        this.logEndBatch(repoIdentifier, changesetCount - this.indexBatchSize);
                        this.logStartBatch(repoIdentifier, changesetCount);
                        tx = this.beginTx(repository, changesetCount);
                        repo = this.repositoryService.findRepositoryById(repository.getId());
                    }
                    if (!log.isTraceEnabled()) continue;
                    log.trace(repoIdentifier + "processed " + changeset.getId() + " - " + changesetCount);
                }
            }
            catch (InterruptedException e) {
                log.info(repoIdentifier + "Indexing was interrupted");
            }
            finally {
                this.wrapUpCommand(repoIdentifier, result);
            }
            changesetQueue.clear();
            if (!this.active) {
                return;
            }
            AsyncCommand<Void> delCommand = this.getChangesetsRemovedSinceLastRunCommand(repo, lastIndexingRunDate, thisIndexingRunDate, changesetQueue);
            Future delResult = delCommand.start();
            timeoutTimestamp = this.getExecutionTimeoutTimestampFromNow();
            log.debug(repoIdentifier + "Scanning for deleted changesets...");
            try {
                Changeset changeset;
                while (this.active && (changeset = changesetQueue.poll(5L, TimeUnit.SECONDS)) != null) {
                    this.handleProcessTimeout(repoIdentifier, delResult, timeoutTimestamp);
                    this.changesetIndex.removeChangeset(changeset.getId(), repo);
                    for (ChangesetIndexer indexer : indexers) {
                        log.debug(repoIdentifier + "Changeset " + changeset.getId() + " has been removed");
                        indexer.onChangesetRemoved(changeset, ctx);
                    }
                    if (++changesetCount % this.indexBatchSize == 0) {
                        this.transactionManager.commit(tx);
                        this.logEndBatch(repoIdentifier, changesetCount - this.indexBatchSize);
                        this.logStartBatch(repoIdentifier, changesetCount);
                        tx = this.beginTx(repository, changesetCount);
                        repo = this.repositoryService.findRepositoryById(repository.getId());
                    }
                    if (!log.isTraceEnabled()) continue;
                    log.trace(repoIdentifier + "processed " + changeset.getId() + " - " + changesetCount);
                }
            }
            catch (InterruptedException e) {
                log.info(repoIdentifier + "Indexing was interrupted");
            }
            finally {
                this.wrapUpCommand(repoIdentifier, delResult);
            }
            if (!this.active) {
                return;
            }
            this.indexerStateDao.setLastRunDate(repo, indexers, thisIndexingRunDate);
            this.transactionManager.commit(tx);
            tx = null;
            this.notifyOnAfter(repoIdentifier, indexers, ctx);
        }
        finally {
            if (tx != null && !tx.isCompleted()) {
                log.info(repoIdentifier + "Rolling back indexing transaction");
                this.transactionManager.rollback(tx);
            }
            this.logFinalBatch(repoIdentifier, changesetCount - changesetCount % this.indexBatchSize, changesetCount);
        }
    }

    private void wrapUpCommand(String repoIdentifier, Future<Void> result) {
        log.debug(repoIdentifier + "Waiting for scm command to wrap up");
        try {
            result.get(10L, TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException e) {
            log.debug(repoIdentifier + "Scm command did not finish on its own, canceling process");
            result.cancel(true);
        }
        catch (InterruptedException e) {
            log.debug(repoIdentifier + "Interrupted while waiting for the process to complete", (Throwable)e);
            result.cancel(true);
        }
        catch (ExecutionException e) {
            log.debug(repoIdentifier + "Exception happened while waiting for the process to complete", (Throwable)e);
            result.cancel(true);
        }
    }

    private String getBatchId(String repoIdentifier, int fromBatch) {
        return repoIdentifier + "indexChangesetBatch(" + fromBatch + ".." + (fromBatch + this.indexBatchSize) + ")";
    }

    private void logStartBatch(String repoIdentifier, int fromBatch) {
        UtilTimerStack.push((String)this.getBatchId(repoIdentifier, fromBatch));
    }

    private void logEndBatch(String repoIdentifier, int fromBatch) {
        String batchId = this.getBatchId(repoIdentifier, fromBatch);
        UtilTimerStack.pop((String)batchId);
        log.debug(repoIdentifier + "Completed batch, Indexed " + (fromBatch + this.indexBatchSize) + " so far");
    }

    private void logFinalBatch(String repoIdentifier, int fromBatch, int totalProcessed) {
        String batchId = this.getBatchId(repoIdentifier, fromBatch);
        UtilTimerStack.pop((String)batchId);
        log.debug(repoIdentifier + ">Indexed " + totalProcessed + " changesets");
    }

    private void notifyOnBefore(String repoIdentifier, List<ChangesetIndexer> indexers, IndexingContext ctx) {
        UtilTimerStack.push((String)(repoIdentifier + "onBeforeIndexing"));
        for (ChangesetIndexer indexer : indexers) {
            indexer.onBeforeIndexing(ctx);
        }
        UtilTimerStack.pop((String)(repoIdentifier + "onBeforeIndexing"));
    }

    private void notifyOnAfter(String repoIdentifier, List<ChangesetIndexer> indexers, IndexingContext ctx) {
        UtilTimerStack.push((String)(repoIdentifier + "onAfterIndexing"));
        for (ChangesetIndexer indexer : indexers) {
            indexer.onAfterIndexing(ctx);
        }
        UtilTimerStack.pop((String)(repoIdentifier + "onAfterIndexing"));
    }

    private List<ChangesetIndexer> getEnabledIndexers(Repository repository) {
        if (this.indexers.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList result = Lists.newArrayList();
        for (ChangesetIndexer indexer : this.indexers.values()) {
            if (!indexer.isEnabledForRepository(repository)) continue;
            result.add(indexer);
        }
        return result;
    }

    private boolean isEmpty(Repository repository) {
        return this.clientProvider.get(repository.getScmType()).isEmpty(repository);
    }

    private AsyncCommand<Void> getChangesetsAddedSinceLastRunCommand(Repository repository, Date lastIndexingRun, Date thisIndexingRun, ChangesetCallback callback) {
        ScmClient client = this.clientProvider.get(repository.getScmType());
        try {
            this.snapshotService.createRepositorySnapshot(repository, client, thisIndexingRun);
        }
        catch (IOException e) {
            log.warn("Problem writing the repository snapshot to disk.", (Throwable)e);
        }
        Iterable<String> lastIndexingHeads = this.snapshotService.getRepositorySnapshot(repository, lastIndexingRun);
        return client.getChangesetDescendantsCommand(repository, lastIndexingHeads, callback);
    }

    private AsyncCommand<Void> getChangesetsRemovedSinceLastRunCommand(Repository repository, Date lastIndexingRun, Date thisIndexingRun, ChangesetCallback callback) {
        ScmClient client = this.clientProvider.get(repository.getScmType());
        Iterable<String> lastIndexingHeads = this.snapshotService.getRepositorySnapshot(repository, lastIndexingRun);
        Iterable<String> currentIndexingHeads = this.snapshotService.getRepositorySnapshot(repository, thisIndexingRun);
        return client.getChangesetsBetweenCommand(repository, lastIndexingHeads, currentIndexingHeads, callback);
    }

    private IndexingContext createContext(Repository repository) {
        return new DefaultIndexingContext(repository);
    }

    private TransactionStatus beginTx(Repository repository, int numberProcessed) {
        DefaultTransactionAttribute def = new DefaultTransactionAttribute();
        def.setName("ChangesetIndex:" + repository.getSlug() + ":" + numberProcessed);
        def.setPropagationBehavior(3);
        return this.transactionManager.getTransaction((TransactionDefinition)def);
    }
}

