package com.atlassian.confluence.api.impl.service.audit.migration;

import com.atlassian.audit.api.AuditConsumer;
import com.atlassian.audit.entity.CoverageArea;
import com.atlassian.audit.entity.CoverageLevel;
import com.atlassian.audit.spi.migration.LegacyAuditEntityMigrator;
import com.atlassian.confluence.impl.audit.AuditRecordEntity;
import com.atlassian.confluence.internal.audit.AuditFormatConverter;
import com.atlassian.confluence.internal.audit.persistence.dao.AuditRecordDao;
import com.atlassian.confluence.util.LoggingUncaughtExceptionHandler;
import com.google.common.collect.Lists;
import io.atlassian.util.concurrent.ThreadFactories;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;

/* loaded from: input_file:com/atlassian/confluence/api/impl/service/audit/migration/ConfluenceAuditEntityMigrator.class */
public class ConfluenceAuditEntityMigrator implements LegacyAuditEntityMigrator {
    private static final Logger log = LoggerFactory.getLogger(ConfluenceAuditEntityMigrator.class);
    private static final int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;
    private static final int NUM_THREADS = Integer.getInteger("legacy.audit.migrator.num.threads", DEFAULT_NUM_THREADS).intValue();
    private static final int BATCH_SIZE = Integer.getInteger("legacy.audit.migrator.batch.size", 1000).intValue();
    private static final int WAIT_TIME_MS = 3000;
    private final AuditRecordDao auditRecordDao;
    private final PlatformTransactionManager transactionManager;
    private final AuditFormatConverter auditFormatConverter = new AuditFormatConverter();
    private final BlockingQueue<List<Long>> entitiesQueue = new ArrayBlockingQueue(NUM_THREADS);

    /* loaded from: input_file:com/atlassian/confluence/api/impl/service/audit/migration/ConfluenceAuditEntityMigrator$Migrator.class */
    private class Migrator implements Runnable {
        private final AuditConsumer auditConsumer;
        private final BiConsumer<Integer, Integer> statusUpdater;

        private Migrator(AuditConsumer auditConsumer, BiConsumer<Integer, Integer> biConsumer) {
            this.auditConsumer = (AuditConsumer) Objects.requireNonNull(auditConsumer);
            this.statusUpdater = (BiConsumer) Objects.requireNonNull(biConsumer);
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    doMigrate();
                } catch (RuntimeException e) {
                    ConfluenceAuditEntityMigrator.log.warn("Error migrating some audit events", e);
                }
            }
        }

        private void doMigrate() {
            try {
                List<Long> take = ConfluenceAuditEntityMigrator.this.entitiesQueue.take();
                Integer num = null;
                try {
                    num = (Integer) getTransactionTemplate().execute(transactionStatus -> {
                        List<AuditRecordEntity> fetchByIds = ConfluenceAuditEntityMigrator.this.auditRecordDao.fetchByIds(take);
                        ArrayList arrayList = new ArrayList(fetchByIds.size());
                        ArrayList arrayList2 = new ArrayList(fetchByIds.size());
                        for (AuditRecordEntity auditRecordEntity : fetchByIds) {
                            try {
                                arrayList2.add(ConfluenceAuditEntityMigrator.this.auditFormatConverter.toAuditEntity(auditRecordEntity, CoverageArea.END_USER_ACTIVITY, CoverageLevel.BASE));
                                arrayList.add(auditRecordEntity);
                            } catch (RuntimeException e) {
                                ConfluenceAuditEntityMigrator.log.warn("Error converting legacy audit record to Atlassian Audit", e);
                            }
                        }
                        this.auditConsumer.accept(arrayList2);
                        ConfluenceAuditEntityMigrator.this.auditRecordDao.deleteRecords(arrayList);
                        return Integer.valueOf(arrayList2.size());
                    });
                    int intValue = num != null ? num.intValue() : 0;
                    this.statusUpdater.accept(Integer.valueOf(intValue), Integer.valueOf(take.size() - intValue));
                } catch (Throwable th) {
                    int intValue2 = num != null ? num.intValue() : 0;
                    this.statusUpdater.accept(Integer.valueOf(intValue2), Integer.valueOf(take.size() - intValue2));
                    throw th;
                }
            } catch (InterruptedException e) {
                ConfluenceAuditEntityMigrator.log.debug("Interrupted", e);
                Thread.currentThread().interrupt();
            }
        }

        private TransactionTemplate getTransactionTemplate() {
            TransactionTemplate transactionTemplate = new TransactionTemplate(ConfluenceAuditEntityMigrator.this.transactionManager);
            transactionTemplate.setPropagationBehavior(0);
            return transactionTemplate;
        }
    }

    public ConfluenceAuditEntityMigrator(AuditRecordDao auditRecordDao, PlatformTransactionManager platformTransactionManager) {
        this.auditRecordDao = (AuditRecordDao) Objects.requireNonNull(auditRecordDao);
        this.transactionManager = (PlatformTransactionManager) Objects.requireNonNull(platformTransactionManager);
    }

    public void migrate(AuditConsumer auditConsumer) {
        List fetchAllRecordIds = this.auditRecordDao.fetchAllRecordIds();
        if (fetchAllRecordIds.size() <= 0) {
            log.info("Found nothing to migrate. Proceeding.");
            return;
        }
        log.info("Found {} audit records to migrate.", Integer.valueOf(fetchAllRecordIds.size()));
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(NUM_THREADS, ThreadFactories.named("audit-migrator").type(ThreadFactories.Type.DAEMON).uncaughtExceptionHandler(LoggingUncaughtExceptionHandler.INSTANCE).build());
        int size = fetchAllRecordIds.size();
        Logger logger = log;
        Objects.requireNonNull(logger);
        MigrationStatusManager migrationStatusManager = new MigrationStatusManager(size, logger::info);
        IntStream.rangeClosed(1, NUM_THREADS).forEach(i -> {
            newFixedThreadPool.execute(new Migrator(auditConsumer, migrationStatusManager));
        });
        try {
            Iterator it = Lists.partition(fetchAllRecordIds, BATCH_SIZE).iterator();
            while (it.hasNext()) {
                try {
                    this.entitiesQueue.put((List) it.next());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.warn("Interrupted while migrating", e);
                    newFixedThreadPool.shutdownNow();
                    return;
                }
            }
            migrationStatusManager.waitUntilCompletion(3000L);
            newFixedThreadPool.shutdownNow();
        } catch (Throwable th) {
            newFixedThreadPool.shutdownNow();
            throw th;
        }
    }
}
