package com.atlassian.jira.cluster.distribution.localq;

import com.atlassian.jira.cluster.ClusterNodes;
import com.atlassian.jira.cluster.Node;
import com.atlassian.jira.cluster.distribution.localq.LocalQCacheOpSender;
import com.atlassian.jira.component.pico.ContainerNotInitializedException;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableSet;
import java.rmi.NotBoundException;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/atlassian/jira/cluster/distribution/localq/LocalQCacheOpReader.class */
public class LocalQCacheOpReader implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(LocalQCacheOpReader.class);
    private static final int LOG_CHECKED_EXCEPTIONS_FREQUENCY = 100;
    private static final int LOG_RUNTIME_EXCEPTIONS_FREQUENCY = 50;
    private final LocalQCacheOpQueueWithStats queue;
    private final LocalQCacheOpSender sender;
    private final LocalQCriticalHandler criticalHandler;
    private final ClusterNodes clusterNodes;
    private final Set<Node.NodeState> nodeStatesWithActiveSenders = ImmutableSet.of(Node.NodeState.ACTIVE);
    private final long nodeTimeSynchronisationToleranceMillis;
    private final boolean replicatePutsViaCopy;
    private final long putTTLMillis;

    private LocalQCacheOpReader(LocalQCacheOpQueueWithStats localQCacheOpQueueWithStats, LocalQCacheOpSender localQCacheOpSender, LocalQCriticalHandler localQCriticalHandler, ClusterNodes clusterNodes, long j, boolean z, long j2) {
        this.queue = localQCacheOpQueueWithStats;
        this.sender = localQCacheOpSender;
        this.criticalHandler = localQCriticalHandler;
        this.clusterNodes = clusterNodes;
        this.nodeTimeSynchronisationToleranceMillis = j;
        this.replicatePutsViaCopy = z;
        this.putTTLMillis = j2;
    }

    public static LocalQCacheOpReader create(LocalQCacheOpQueueWithStats localQCacheOpQueueWithStats, LocalQCacheOpSender localQCacheOpSender, LocalQCriticalHandler localQCriticalHandler, ClusterNodes clusterNodes, boolean z) {
        return new LocalQCacheOpReader(localQCacheOpQueueWithStats, localQCacheOpSender, localQCriticalHandler, clusterNodes, LocalQConfig.nodeTimeSynchronisationToleranceMillis(), z, LocalQConfig.putTTLMillis());
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info(LogPrefix.prefix(this.replicatePutsViaCopy) + "Started listening for cache replication queue: {} ", this.queue.name());
        AtomicLong atomicLong = new AtomicLong(0L);
        while (!Thread.currentThread().isInterrupted() && !this.queue.isClosed()) {
            try {
                Node node = this.clusterNodes.node(this.queue.id().nodeId);
                if (node == null) {
                    LOG.warn(LogPrefix.prefix(this.replicatePutsViaCopy) + "Trying to send a LocalQCacheOp to non-existing node: {}", this.queue.id().nodeId);
                    TimeUnit.SECONDS.sleep(5L);
                } else if (this.nodeStatesWithActiveSenders.contains(node.getState())) {
                    LocalQCacheOp peekOrBlock = peekOrBlock();
                    if (peekOrBlock != null) {
                        if (isStaleFor(peekOrBlock, node)) {
                            LOG.debug(LogPrefix.prefix(this.replicatePutsViaCopy) + "Skipping sending stale: {} to node: {}. Removing from cache replication queue: {}.", new Object[]{peekOrBlock, node, this.queue.name()});
                            remove(peekOrBlock);
                            this.queue.notifyStale();
                        } else {
                            boolean z = false;
                            try {
                                Stopwatch createStarted = Stopwatch.createStarted();
                                this.sender.send(node, peekOrBlock);
                                createStarted.stop();
                                z = true;
                                atomicLong.set(0L);
                                this.queue.notifySendWithTime(createStarted.elapsed(TimeUnit.MILLISECONDS));
                            } catch (LocalQCacheOpSender.RecoverableFailure e) {
                                handleCheckedException(e, peekOrBlock, atomicLong);
                            } catch (RuntimeException e2) {
                                handleRuntimeException(e2, peekOrBlock, atomicLong);
                            }
                            if (z) {
                                remove(peekOrBlock);
                            }
                        }
                    }
                } else {
                    LOG.debug(LogPrefix.prefix(this.replicatePutsViaCopy) + "Not sending to node in state: {}, only sending to nodes in following states: {}", node.getState(), this.nodeStatesWithActiveSenders);
                    TimeUnit.SECONDS.sleep(5L);
                }
            } catch (InterruptedException e3) {
                Thread.currentThread().interrupt();
            } catch (Throwable th) {
                this.queue.notifyOtherException();
                LOG.warn(LogPrefix.prefix(this.replicatePutsViaCopy) + "Error when processing the replication queue: {}, error: {}", new Object[]{this.queue.name(), th.getMessage(), th});
                try {
                    TimeUnit.SECONDS.sleep(5L);
                } catch (InterruptedException e4) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        LOG.info(LogPrefix.prefix(this.replicatePutsViaCopy) + "Finished listening on cache replication queue: {} , queue isClosed: {}, thread isInterrupted: {}", new Object[]{this.queue.name(), Boolean.valueOf(this.queue.isClosed()), Boolean.valueOf(Thread.currentThread().isInterrupted())});
    }

    private boolean isStaleFor(LocalQCacheOp localQCacheOp, Node node) {
        Preconditions.checkArgument(this.nodeStatesWithActiveSenders.contains(node.getState()));
        if (!this.replicatePutsViaCopy) {
            long creationTimeInMillis = localQCacheOp.getCreationTimeInMillis();
            long longValue = node.getTimestamp().longValue();
            return (creationTimeInMillis == 0 || longValue == 0 || creationTimeInMillis >= longValue - this.nodeTimeSynchronisationToleranceMillis) ? false : true;
        }
        long currentTimeMillis = System.currentTimeMillis() - localQCacheOp.getCreationTimeInMillis();
        if (currentTimeMillis <= this.putTTLMillis) {
            return false;
        }
        LOG.debug(LogPrefix.prefix(this.replicatePutsViaCopy) + "Stale PUT operation: {}, operation is: {} millis old and this is more than PUT TTL: {} millis", new Object[]{localQCacheOp, Long.valueOf(currentTimeMillis), Long.valueOf(this.putTTLMillis)});
        return true;
    }

    boolean remove(LocalQCacheOp localQCacheOp) {
        try {
            if (this.queue.isClosed()) {
                LOG.warn(LogPrefix.prefix(this.replicatePutsViaCopy) + "Will not remove cache replication event: {} from closed cache replication queue: {}", localQCacheOp, this.queue.name());
                return false;
            }
            this.queue.remove();
            return true;
        } catch (NoSuchElementException e) {
            LOG.warn(LogPrefix.prefix(this.replicatePutsViaCopy) + "Tried to remove: {} from an empty cache replication queue: {}. This could happen if queue.remove was called without queue.peek or there was a queue.backup between queue.peek and queue.remove. Ignoring and continue...", localQCacheOp, this.queue.name());
            return false;
        } catch (Throwable th) {
            LOG.error(LogPrefix.prefix(this.replicatePutsViaCopy) + "Critical state of local cache replication queue - cannot remove localQCacheOp: {} from queue: {}, error: {}. ", new Object[]{localQCacheOp, this.queue.name(), th.getMessage(), th});
            this.queue.notifyCriticalRemove();
            return this.criticalHandler.handleCriticalRemove(this.queue, localQCacheOp, th);
        }
    }

    LocalQCacheOp peekOrBlock() throws InterruptedException, IllegalStateException {
        try {
            if (!this.queue.isClosed()) {
                return this.queue.peekOrBlock();
            }
            LOG.warn(LogPrefix.prefix(this.replicatePutsViaCopy) + "Will not peekOrBlock from closed cache replication queue: {}", this.queue.name());
            return null;
        } catch (InterruptedException e) {
            throw e;
        } catch (Throwable th) {
            if (!(th instanceof ContainerNotInitializedException)) {
                LOG.error(LogPrefix.prefix(this.replicatePutsViaCopy) + "Critical state of local cache replication queue - cannot peek from queue: {}, error: {}", new Object[]{this.queue.name(), th.getMessage(), th});
            }
            this.queue.notifyCriticalPeek();
            this.criticalHandler.handleCriticalPeek(this.queue, th);
            return null;
        }
    }

    private int numberOfRetriesFor(RuntimeException runtimeException) {
        if (this.replicatePutsViaCopy) {
            return 1;
        }
        if (runtimeException instanceof LocalQCacheOpSender.UnrecoverableFailure) {
            return ((LocalQCacheOpSender.UnrecoverableFailure) runtimeException).retry;
        }
        return 10;
    }

    private void handleRuntimeException(RuntimeException runtimeException, LocalQCacheOp localQCacheOp, AtomicLong atomicLong) throws InterruptedException {
        boolean z = runtimeException.getCause() instanceof NotBoundException;
        if (z) {
            this.queue.notifyNotBoundException();
        } else {
            this.queue.notifyRuntimeException();
        }
        atomicLong.incrementAndGet();
        int numberOfRetriesFor = numberOfRetriesFor(runtimeException);
        if (atomicLong.get() == 1 || atomicLong.get() % 50 == 0) {
            LOG.warn(LogPrefix.prefix(this.replicatePutsViaCopy) + "Runtime exception: {} occurred when processing: {} from cache replication queue: {}, failuresCount: {}/{}, error: {}", new Object[]{runtimeException.getClass().getSimpleName(), localQCacheOp, this.queue.name(), Long.valueOf(atomicLong.get()), Integer.valueOf(numberOfRetriesFor), runtimeException.getMessage()});
        }
        if (atomicLong.get() < numberOfRetriesFor) {
            sleepBeforeNextOpRetry(atomicLong.get(), 10, 100, 500, 500, 1000);
            return;
        }
        if (z) {
            LOG.warn(LogPrefix.prefix(this.replicatePutsViaCopy) + "Abandoning sending because cache does not exist on destination node: {} from cache replication queue: {}, failuresCount: {}/{}. Removing from queue. Error: {}", new Object[]{localQCacheOp, this.queue.name(), atomicLong, Integer.valueOf(numberOfRetriesFor), runtimeException.getCause().getMessage(), runtimeException.getCause()});
        } else {
            LOG.error(LogPrefix.prefix(this.replicatePutsViaCopy) + "Abandoning sending: {} from cache replication queue: {}, failuresCount: {}/{}. Removing from queue. Error: {}", new Object[]{localQCacheOp, this.queue.name(), atomicLong, Integer.valueOf(numberOfRetriesFor), runtimeException.getMessage(), runtimeException});
            this.queue.notifyDroppedOnSend();
        }
        atomicLong.set(0L);
        remove(localQCacheOp);
    }

    private void handleCheckedException(LocalQCacheOpSender.RecoverableFailure recoverableFailure, LocalQCacheOp localQCacheOp, AtomicLong atomicLong) throws InterruptedException {
        this.queue.notifyCheckedException();
        atomicLong.incrementAndGet();
        if (this.replicatePutsViaCopy) {
            LOG.info(LogPrefix.prefix(this.replicatePutsViaCopy) + "Checked exception: {} occurred when processing: {} from cache replication queue: {}, failuresCount: {}. Will not retry as this is a cache replicated by value. Removing from queue.", new Object[]{recoverableFailure.getClass().getSimpleName(), localQCacheOp, this.queue.name(), Long.valueOf(atomicLong.get()), recoverableFailure});
            atomicLong.set(0L);
            remove(localQCacheOp);
        } else {
            if (atomicLong.get() == 1 || atomicLong.get() % 100 == 0) {
                LOG.info(LogPrefix.prefix(this.replicatePutsViaCopy) + "Checked exception: {} occurred when processing: {} from cache replication queue: {}, failuresCount: {}. Will retry indefinitely.", new Object[]{recoverableFailure.getClass().getSimpleName(), localQCacheOp, this.queue.name(), Long.valueOf(atomicLong.get()), recoverableFailure});
            } else {
                LOG.trace(LogPrefix.prefix(this.replicatePutsViaCopy) + "Checked exception: {} occurred when processing: {} from cache replication queue: {}, failuresCount: {}. Will retry indefinitely.", new Object[]{recoverableFailure.getClass().getSimpleName(), localQCacheOp, this.queue.name(), Long.valueOf(atomicLong.get()), recoverableFailure});
            }
            sleepBeforeNextOpRetry(atomicLong.get(), 100, 500, 1000, 1000, 1000, 1000, 2000, 3000, 5000);
        }
    }

    void sleepBeforeNextOpRetry(long j, long... jArr) throws InterruptedException {
        if (this.replicatePutsViaCopy) {
            return;
        }
        if (j < 1) {
            TimeUnit.MILLISECONDS.sleep(jArr[0]);
        } else {
            TimeUnit.MILLISECONDS.sleep(jArr[Math.min((int) j, jArr.length) - 1]);
        }
    }
}
