package com.atlassian.bitbucket.internal.search.indexing.event;

import com.atlassian.bitbucket.internal.search.indexing.IndexingProperties;
import com.atlassian.bitbucket.internal.search.indexing.event.IndexEventWorker;
import com.atlassian.bitbucket.internal.search.indexing.filter.IndexFilterService;
import com.atlassian.bitbucket.search.util.Backoff;
import com.atlassian.sal.api.lifecycle.LifecycleAware;
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Service("defaultIndexEventQueueProcessor")
/* loaded from: input_file:WEB-INF/atlassian-bundled-plugins/bitbucket-search-5.16.0.jar:com/atlassian/bitbucket/internal/search/indexing/event/DefaultIndexEventQueueProcessor.class */
public class DefaultIndexEventQueueProcessor implements IndexEventQueueProcessor, LifecycleAware {
    private static final int MAX_RETRIES = 5;
    private static final Duration MAX_RETRY_DELAY = Duration.ofMinutes(30);
    private static final Duration MIN_RETRY_DELAY = Duration.ofMinutes(1);
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DefaultIndexEventQueueProcessor.class);
    private final DelayedScheduler<QueuedEvent> delayedScheduler;
    private final Duration dequeueTimeout;
    private final Duration enqueueTimeout;
    private final BlockingQueue<QueuedEvent> eventQueue;
    private final IndexEventWorker eventWorker;
    private final ExecutorService executorService;
    private final IndexFilterService indexFilterService;
    private final Duration shutdownTimeout;
    private Future<Boolean> eventLoopFuture;
    private volatile boolean shouldProcessEvents;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/atlassian-bundled-plugins/bitbucket-search-5.16.0.jar:com/atlassian/bitbucket/internal/search/indexing/event/DefaultIndexEventQueueProcessor$WakeUpEvent.class */
    public static class WakeUpEvent implements IndexEvent {
        public static final WakeUpEvent INSTANCE = new WakeUpEvent();

        private WakeUpEvent() {
        }

        @Override // com.atlassian.bitbucket.internal.search.indexing.event.IndexEvent
        public <T> T accept(IndexEventVisitor<T> indexEventVisitor) {
            return null;
        }
    }

    @Autowired
    public DefaultIndexEventQueueProcessor(@Qualifier("searchIndexingExecutorBean") ExecutorService executorService, IndexEventWorker indexEventWorker, IndexFilterService indexFilterService, IndexingProperties indexingProperties, DelayedScheduler<QueuedEvent> delayedScheduler) {
        this(executorService, indexEventWorker, new LinkedBlockingQueue(indexingProperties.getIndexingEventQueueSize()), delayedScheduler, indexFilterService, indexingProperties.getIndexingEventEnqueueTimeout(), indexingProperties.getIndexingEventDequeueTimeout(), indexingProperties.getIndexingEventShutdownTimeout());
    }

    @VisibleForTesting
    DefaultIndexEventQueueProcessor(ExecutorService executorService, IndexEventWorker indexEventWorker, BlockingQueue<QueuedEvent> blockingQueue, DelayedScheduler<QueuedEvent> delayedScheduler, IndexFilterService indexFilterService, Duration duration, Duration duration2, Duration duration3) {
        this.executorService = executorService;
        this.eventWorker = indexEventWorker;
        this.eventQueue = blockingQueue;
        this.delayedScheduler = delayedScheduler;
        this.indexFilterService = indexFilterService;
        this.enqueueTimeout = duration;
        this.dequeueTimeout = duration2;
        this.shutdownTimeout = duration3;
        delayedScheduler.setProcessor(this::retryDelayedEvent);
    }

    @Override // com.atlassian.bitbucket.internal.search.indexing.event.IndexEventQueueProcessor
    public void clear() {
        this.eventQueue.clear();
    }

    @Override // com.atlassian.bitbucket.internal.search.indexing.event.IndexEventQueueProcessor
    public long getDelayQueueSize() {
        return this.delayedScheduler.getNumberOfScheduledItems();
    }

    @Override // com.atlassian.bitbucket.internal.search.indexing.event.IndexEventQueueProcessor
    public long getEventQueueSize() {
        return this.eventQueue.size();
    }

    @Override // com.atlassian.bitbucket.internal.search.indexing.event.IndexEventQueueProcessor
    public boolean isWorkerAlive() {
        return (this.eventLoopFuture == null || this.eventLoopFuture.isCancelled() || this.eventLoopFuture.isDone()) ? false : true;
    }

    @Override // com.atlassian.sal.api.lifecycle.LifecycleAware
    public void onStart() {
        resumeIndexing();
    }

    @Override // com.atlassian.sal.api.lifecycle.LifecycleAware
    public void onStop() {
        suspendIndexing();
        try {
            this.executorService.awaitTermination(this.shutdownTimeout.getSeconds(), TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.warn("search-indexing executor did not exit in a graceful manner.", (Throwable) e);
        } finally {
            this.executorService.shutdown();
        }
        log.info("Worker thread has been shut down");
    }

    @Override // com.atlassian.bitbucket.internal.search.indexing.event.IndexEventQueueProcessor
    public boolean queueEvent(@Nonnull IndexEvent indexEvent) {
        return queueEvent(indexEvent, this.enqueueTimeout);
    }

    @Override // com.atlassian.bitbucket.internal.search.indexing.event.IndexEventQueueProcessor
    public boolean queueEvent(@Nonnull IndexEvent indexEvent, @Nonnull Duration duration) {
        return addEvent(new QueuedEvent((IndexEvent) Objects.requireNonNull(indexEvent, "indexEvent"), 0), duration);
    }

    @VisibleForTesting
    Future<Boolean> getEventLoopFuture() {
        return this.eventLoopFuture;
    }

    @VisibleForTesting
    void resumeIndexing() {
        log.debug("Event processor is now entering RUNNING state");
        if (isWorkerAlive()) {
            log.error("Worker thread has already been started...");
            return;
        }
        this.shouldProcessEvents = true;
        this.eventLoopFuture = this.executorService.submit(this::processEvents);
        log.info("Event queue processor has been started");
    }

    @VisibleForTesting
    void suspendIndexing() {
        log.debug("Event processor is now entering STOPPED state");
        this.shouldProcessEvents = false;
        try {
            this.eventQueue.offer(new QueuedEvent(WakeUpEvent.INSTANCE, 0), this.shutdownTimeout.getSeconds(), TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.warn("Unable to enqueue a WakeUpEvent to allow the worker thread to exit cleanly. Cancelling thread.", (Throwable) e);
        }
        try {
            if (this.eventLoopFuture != null && !this.eventLoopFuture.isCancelled()) {
                this.eventLoopFuture.get(this.shutdownTimeout.getSeconds(), TimeUnit.SECONDS);
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            log.debug("Worker has been interrupted and will now exited.", (Throwable) e2);
        } catch (ExecutionException | TimeoutException e3) {
            log.warn("Worker did not exit in a graceful manner.", e3);
        }
    }

    private boolean addEvent(QueuedEvent queuedEvent, @Nonnull Duration duration) {
        try {
            if (this.eventQueue.offer(queuedEvent, duration.getSeconds(), TimeUnit.SECONDS)) {
                return true;
            }
            log.debug("Unable to enqueue event [{}], the queue is possibly full.", queuedEvent.getEvent());
            return false;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("Thread was interrupted while waiting to enqueue an event.", (Throwable) e);
            return false;
        }
    }

    private void delayedRetry(QueuedEvent queuedEvent, int i) {
        Duration calculateExponential = Backoff.calculateExponential(i, MIN_RETRY_DELAY, MAX_RETRY_DELAY);
        log.info("Worker instructed us to retry {}, retrying in {} s", queuedEvent, Long.valueOf(calculateExponential.getSeconds()));
        this.delayedScheduler.schedule(new QueuedEvent(queuedEvent.getEvent(), i), calculateExponential);
    }

    private void delayedRetryOrDropEvent(QueuedEvent queuedEvent) {
        int retries = queuedEvent.getRetries() + 1;
        if (retries > 5) {
            log.info("Worker instructed us to retry {} but the maximum number of retries ({}) has been reached, dropping event", (Object) queuedEvent, (Object) 5);
        } else {
            delayedRetry(queuedEvent, retries);
        }
    }

    private void processEvent(QueuedEvent queuedEvent) {
        IndexEvent event = queuedEvent.getEvent();
        if (this.indexFilterService.shouldProcess(event)) {
            this.eventWorker.process(event).subscribe(instruction -> {
                if (instruction instanceof IndexEventWorker.UnlimitedRetryInstruction) {
                    delayedRetry(queuedEvent, queuedEvent.getRetries());
                    return;
                }
                if (instruction instanceof IndexEventWorker.LimitedRetryInstruction) {
                    delayedRetryOrDropEvent(queuedEvent);
                } else if (instruction instanceof IndexEventWorker.QueueEventInstruction) {
                    IndexEvent event2 = ((IndexEventWorker.QueueEventInstruction) instruction).getEvent();
                    log.info("Worker instructed us to enqueue {} after processing {}", event2, queuedEvent);
                    queueEvent(event2);
                }
            }, th -> {
                log.error("Unexpected error from index event worker for {}, dropping event", queuedEvent, th);
            });
        } else {
            log.debug("Event was excluded from indexing based on indexing filter rules: [{}]", event);
        }
    }

    private boolean processEvents() {
        while (this.shouldProcessEvents && !Thread.currentThread().isInterrupted()) {
            try {
                QueuedEvent poll = this.eventQueue.poll(this.dequeueTimeout.getSeconds(), TimeUnit.SECONDS);
                if (poll != null && poll.getEvent() != WakeUpEvent.INSTANCE) {
                    try {
                        processEvent(poll);
                    } catch (Exception e) {
                        log.error("An unexpected error was encountered while processing event [{}]\nThis event is now discarded and will not be retried.\nResuming normal processing of events.", poll.toString(), e);
                    }
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        return true;
    }

    private void retryDelayedEvent(QueuedEvent queuedEvent) {
        addEvent(queuedEvent, this.enqueueTimeout);
    }
}
