/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.tasklist.zeebeimport;

import io.camunda.tasklist.Metrics;
import io.camunda.tasklist.entities.meta.ImportPositionEntity;
import io.camunda.tasklist.exceptions.NoSuchIndexException;
import io.camunda.tasklist.exceptions.TasklistRuntimeException;
import io.camunda.tasklist.property.TasklistProperties;
import io.camunda.tasklist.util.ThreadUtil;
import io.camunda.tasklist.zeebe.ImportValueType;
import io.camunda.tasklist.zeebeimport.ImportBatch;
import io.camunda.tasklist.zeebeimport.ImportJob;
import io.camunda.tasklist.zeebeimport.ImportPositionHolder;
import io.camunda.tasklist.zeebeimport.RecordsReader;
import io.camunda.zeebe.protocol.Protocol;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

public abstract class RecordsReaderAbstract
implements RecordsReader,
Runnable {
    public static final String PARTITION_ID_FIELD_NAME = "partitionId";
    private static final Logger LOGGER = LoggerFactory.getLogger(RecordsReaderAbstract.class);
    @Autowired
    protected TasklistProperties tasklistProperties;
    @Autowired
    protected Metrics metrics;
    protected final int partitionId;
    protected final ImportValueType importValueType;
    protected final long maxPossibleSequence;
    protected int countEmptyRuns;
    @Autowired
    private ImportPositionHolder importPositionHolder;
    @Autowired
    private BeanFactory beanFactory;
    @Autowired
    @Qualifier(value="tasklistRecordsReaderThreadPoolExecutor")
    private ThreadPoolTaskScheduler readersExecutor;
    @Autowired
    @Qualifier(value="tasklistImportThreadPoolExecutor")
    private ThreadPoolTaskExecutor importExecutor;
    private ImportJob pendingImportJob;
    private final ReentrantLock schedulingImportJobLock;
    private boolean ongoingRescheduling;
    private final BlockingQueue<Callable<Boolean>> importJobs;
    private Callable<Boolean> active;

    public RecordsReaderAbstract(int partitionId, ImportValueType importValueType, int queueSize) {
        this.partitionId = partitionId;
        this.importValueType = importValueType;
        this.importJobs = new LinkedBlockingQueue<Callable<Boolean>>(queueSize);
        this.schedulingImportJobLock = new ReentrantLock();
        this.maxPossibleSequence = Protocol.encodePartitionId((int)(partitionId + 1), (long)0L) - 1L;
    }

    @Override
    public void run() {
        this.readAndScheduleNextBatch();
    }

    @Override
    public int readAndScheduleNextBatch(boolean autoContinue) {
        block10: {
            int readerBackoff = this.tasklistProperties.getImporter().getReaderBackoff();
            boolean useOnlyPosition = this.tasklistProperties.getImporter().isUseOnlyPosition();
            try {
                ImportBatch importBatch;
                ImportPositionEntity latestPosition = this.importPositionHolder.getLatestScheduledPosition(this.importValueType.getAliasTemplate(), this.partitionId);
                if (!useOnlyPosition && latestPosition != null && latestPosition.getSequence() > 0L) {
                    LOGGER.debug("Use import for {} ( {} ) by sequence", (Object)this.importValueType.name(), (Object)this.partitionId);
                    importBatch = this.readNextBatchBySequence(latestPosition.getSequence());
                } else {
                    LOGGER.debug("Use import for {} ( {} ) by position", (Object)this.importValueType.name(), (Object)this.partitionId);
                    importBatch = this.readNextBatchByPositionAndPartition(latestPosition.getPosition(), null);
                }
                Integer nextRunDelay = null;
                if (importBatch.getHits().size() == 0) {
                    nextRunDelay = readerBackoff;
                } else {
                    ImportJob importJob = this.createImportJob(latestPosition, importBatch);
                    if (!this.scheduleImportJob(importJob, !autoContinue)) {
                        return 0;
                    }
                }
                if (autoContinue) {
                    this.rescheduleReader(nextRunDelay);
                }
                return importBatch.getHits().size();
            }
            catch (NoSuchIndexException ex) {
                if (autoContinue) {
                    this.rescheduleReader(readerBackoff);
                }
            }
            catch (Exception ex) {
                LOGGER.error(ex.getMessage(), (Throwable)ex);
                if (!autoContinue) break block10;
                this.rescheduleReader(null);
            }
        }
        return 0;
    }

    @Override
    public int readAndScheduleNextBatch() {
        return this.readAndScheduleNextBatch(true);
    }

    @Override
    public ImportBatch readNextBatchBySequence(Long sequence) throws NoSuchIndexException {
        return this.readNextBatchBySequence(sequence, null);
    }

    @Override
    public boolean tryToScheduleImportJob(ImportJob importJob, boolean skipPendingJob) {
        return this.withReschedulingImportJobLock(() -> {
            boolean scheduled = false;
            for (int retries = 3; !scheduled && retries > 0; --retries) {
                scheduled = this.importJobs.offer(this.executeJob(importJob));
            }
            ImportJob importJob2 = this.pendingImportJob = skipPendingJob || scheduled ? null : importJob;
            if (scheduled && this.active == null) {
                this.executeNext();
            }
            return scheduled;
        });
    }

    @Override
    public int getPartitionId() {
        return this.partitionId;
    }

    @Override
    public ImportValueType getImportValueType() {
        return this.importValueType;
    }

    @Override
    public BlockingQueue<Callable<Boolean>> getImportJobs() {
        return this.importJobs;
    }

    private ImportJob createImportJob(ImportPositionEntity latestPosition, ImportBatch importBatch) {
        return (ImportJob)this.beanFactory.getBean(ImportJob.class, new Object[]{importBatch, latestPosition});
    }

    private void rescheduleReader(Integer readerDelay) {
        if (readerDelay != null) {
            this.readersExecutor.schedule((Runnable)this, OffsetDateTime.now().plus(readerDelay.intValue(), ChronoUnit.MILLIS).toInstant());
        } else {
            this.readersExecutor.submit((Runnable)this);
        }
    }

    private boolean scheduleImportJob(ImportJob job, boolean skipPendingJob) {
        if (this.tryToScheduleImportJob(job, skipPendingJob)) {
            this.importJobScheduledSucceeded(job);
            return true;
        }
        return false;
    }

    private void importJobScheduledSucceeded(ImportJob job) {
        this.metrics.getTimer("tasklist.import.job.scheduled", new String[]{"type", this.importValueType.name(), "partition", String.valueOf(this.partitionId)}).record(Duration.between(job.getCreationTime(), OffsetDateTime.now()));
        job.recordLatestScheduledPosition();
    }

    private Callable<Boolean> executeJob(ImportJob job) {
        return () -> {
            try {
                Boolean imported = (Boolean)job.call();
                if (imported.booleanValue()) {
                    this.executeNext();
                    this.rescheduleRecordsReaderIfNecessary();
                } else {
                    ThreadUtil.sleepFor((long)2000L);
                    this.execute(this.active);
                }
                return imported;
            }
            catch (Exception ex) {
                LOGGER.error("Exception occurred when importing data: " + ex.getMessage(), (Throwable)ex);
                ThreadUtil.sleepFor((long)2000L);
                this.execute(this.active);
                return false;
            }
        };
    }

    private void executeNext() {
        this.active = (Callable)this.importJobs.poll();
        if (this.active != null) {
            Future result = this.importExecutor.submit(this.active);
            LOGGER.debug("Submitted next job");
        }
    }

    private void execute(Callable<Boolean> job) {
        Future result = this.importExecutor.submit(job);
        LOGGER.debug("Submitted the same job");
    }

    private void rescheduleRecordsReaderIfNecessary() {
        this.withReschedulingImportJobLock(() -> {
            if (this.hasPendingImportJobToReschedule() && this.shouldReschedulePendingImportJob()) {
                this.startRescheduling();
                this.readersExecutor.submit(this::reschedulePendingImportJob);
            }
        });
    }

    private void reschedulePendingImportJob() {
        try {
            this.scheduleImportJob(this.pendingImportJob, false);
        }
        finally {
            this.withReschedulingImportJobLock(() -> {
                this.pendingImportJob = null;
                this.completeRescheduling();
                this.rescheduleReader(null);
            });
        }
    }

    private boolean hasPendingImportJobToReschedule() {
        return this.pendingImportJob != null;
    }

    private boolean shouldReschedulePendingImportJob() {
        return !this.ongoingRescheduling;
    }

    private void startRescheduling() {
        this.ongoingRescheduling = true;
    }

    private void completeRescheduling() {
        this.ongoingRescheduling = false;
    }

    private void withReschedulingImportJobLock(Runnable action) {
        this.withReschedulingImportJobLock(() -> {
            action.run();
            return null;
        });
    }

    private <T> T withReschedulingImportJobLock(Callable<T> action) {
        try {
            this.schedulingImportJobLock.lock();
            T t = action.call();
            return t;
        }
        catch (Exception e) {
            throw new TasklistRuntimeException((Throwable)e);
        }
        finally {
            this.schedulingImportJobLock.unlock();
        }
    }
}

