/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub.v1;

import com.google.api.core.ApiClock;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.MessageWaiter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;
import org.threeten.bp.temporal.Temporal;
import org.threeten.bp.temporal.TemporalAmount;

class MessageDispatcher {
    private static final Logger logger = Logger.getLogger(MessageDispatcher.class.getName());
    private static final int INITIAL_ACK_DEADLINE_EXTENSION_SECONDS = 2;
    @VisibleForTesting
    static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis((long)100L);
    private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 600;
    private final ScheduledExecutorService executor;
    private final ScheduledExecutorService systemExecutor;
    private final ApiClock clock;
    private final Duration ackExpirationPadding;
    private final Duration maxAckExtensionPeriod;
    private final MessageReceiver receiver;
    private final AckProcessor ackProcessor;
    private final FlowController flowController;
    private final MessageWaiter messagesWaiter;
    private final PriorityQueue<ExtensionJob> outstandingAckHandlers;
    private final Set<String> pendingAcks;
    private final Set<String> pendingNacks;
    private final Lock alarmsLock;
    private int messageDeadlineSeconds;
    private ScheduledFuture<?> ackDeadlineExtensionAlarm;
    private Instant nextAckDeadlineExtensionAlarmTime;
    private ScheduledFuture<?> pendingAcksAlarm;
    private final Deque<OutstandingMessagesBatch> outstandingMessageBatches;
    private final Distribution ackLatencyDistribution;

    MessageDispatcher(MessageReceiver receiver, AckProcessor ackProcessor, Duration ackExpirationPadding, Duration maxAckExtensionPeriod, Distribution ackLatencyDistribution, FlowController flowController, ScheduledExecutorService executor, ScheduledExecutorService systemExecutor, ApiClock clock) {
        this.executor = executor;
        this.systemExecutor = systemExecutor;
        this.ackExpirationPadding = ackExpirationPadding;
        this.maxAckExtensionPeriod = maxAckExtensionPeriod;
        this.receiver = receiver;
        this.ackProcessor = ackProcessor;
        this.flowController = flowController;
        this.outstandingMessageBatches = new LinkedList<OutstandingMessagesBatch>();
        this.outstandingAckHandlers = new PriorityQueue();
        this.pendingAcks = new HashSet<String>();
        this.pendingNacks = new HashSet<String>();
        this.ackLatencyDistribution = ackLatencyDistribution;
        this.alarmsLock = new ReentrantLock();
        this.nextAckDeadlineExtensionAlarmTime = Instant.ofEpochMilli((long)Long.MAX_VALUE);
        this.messagesWaiter = new MessageWaiter();
        this.clock = clock;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        this.messagesWaiter.waitNoMessages();
        this.alarmsLock.lock();
        try {
            if (this.ackDeadlineExtensionAlarm != null) {
                this.ackDeadlineExtensionAlarm.cancel(true);
                this.ackDeadlineExtensionAlarm = null;
            }
        }
        finally {
            this.alarmsLock.unlock();
        }
        this.processOutstandingAckOperations();
    }

    public void setMessageDeadlineSeconds(int messageDeadlineSeconds) {
        this.messageDeadlineSeconds = messageDeadlineSeconds;
    }

    public int getMessageDeadlineSeconds() {
        return this.messageDeadlineSeconds;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processReceivedMessages(List<ReceivedMessage> messages, Runnable doneCallback) {
        if (messages.isEmpty()) {
            doneCallback.run();
            return;
        }
        this.messagesWaiter.incrementPendingMessages(messages.size());
        OutstandingMessagesBatch outstandingBatch = new OutstandingMessagesBatch(doneCallback);
        ArrayList<AckHandler> ackHandlers = new ArrayList<AckHandler>(messages.size());
        for (ReceivedMessage message : messages) {
            AckHandler ackHandler = new AckHandler(message.getAckId(), message.getMessage().getSerializedSize());
            ackHandlers.add(ackHandler);
            outstandingBatch.addMessage(message, ackHandler);
        }
        Instant expiration = Instant.ofEpochMilli((long)this.clock.millisTime()).plusSeconds((long)this.messageDeadlineSeconds);
        Queue<Object> queue = this.outstandingAckHandlers;
        synchronized (queue) {
            this.outstandingAckHandlers.add(new ExtensionJob(Instant.ofEpochMilli((long)this.clock.millisTime()), expiration, 2, ackHandlers));
        }
        this.setupNextAckDeadlineExtensionAlarm(expiration);
        queue = this.outstandingMessageBatches;
        synchronized (queue) {
            this.outstandingMessageBatches.add(outstandingBatch);
        }
        this.processOutstandingBatches();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processOutstandingBatches() {
        while (true) {
            OutstandingMessagesBatch.OutstandingMessage outstandingMessage;
            boolean batchDone = false;
            Runnable batchCallback = null;
            Deque<OutstandingMessagesBatch> deque = this.outstandingMessageBatches;
            synchronized (deque) {
                OutstandingMessagesBatch nextBatch = this.outstandingMessageBatches.peek();
                if (nextBatch == null) {
                    return;
                }
                outstandingMessage = (OutstandingMessagesBatch.OutstandingMessage)nextBatch.messages.peek();
                if (outstandingMessage == null) {
                    return;
                }
                try {
                    this.flowController.reserve(1L, (long)outstandingMessage.receivedMessage().getMessage().getSerializedSize());
                }
                catch (FlowController.MaxOutstandingElementCountReachedException | FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
                    return;
                }
                catch (FlowController.FlowControlException unexpectedException) {
                    throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
                }
                nextBatch.messages.poll();
                batchDone = nextBatch.messages.isEmpty();
                if (batchDone) {
                    this.outstandingMessageBatches.poll();
                    batchCallback = nextBatch.doneCallback;
                }
            }
            final PubsubMessage message = outstandingMessage.receivedMessage().getMessage();
            AckHandler ackHandler = outstandingMessage.ackHandler();
            final SettableFuture response = SettableFuture.create();
            final AckReplyConsumer consumer = new AckReplyConsumer(){

                @Override
                public void ack() {
                    response.set((Object)AckReply.ACK);
                }

                @Override
                public void nack() {
                    response.set((Object)AckReply.NACK);
                }
            };
            Futures.addCallback((ListenableFuture)response, (FutureCallback)ackHandler);
            this.executor.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        MessageDispatcher.this.receiver.receiveMessage(message, consumer);
                    }
                    catch (Exception e) {
                        response.setException((Throwable)e);
                    }
                }
            });
            if (!batchDone) continue;
            batchCallback.run();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setupPendingAcksAlarm() {
        this.alarmsLock.lock();
        try {
            if (this.pendingAcksAlarm == null) {
                this.pendingAcksAlarm = this.systemExecutor.schedule(new Runnable(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        MessageDispatcher.this.alarmsLock.lock();
                        try {
                            MessageDispatcher.this.pendingAcksAlarm = null;
                        }
                        finally {
                            MessageDispatcher.this.alarmsLock.unlock();
                        }
                        MessageDispatcher.this.processOutstandingAckOperations();
                    }
                }, PENDING_ACKS_SEND_DELAY.toMillis(), TimeUnit.MILLISECONDS);
            }
        }
        finally {
            this.alarmsLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setupNextAckDeadlineExtensionAlarm(Instant expiration) {
        Instant possibleNextAlarmTime = expiration.minus((TemporalAmount)this.ackExpirationPadding);
        this.alarmsLock.lock();
        try {
            if (this.nextAckDeadlineExtensionAlarmTime.isAfter(possibleNextAlarmTime)) {
                logger.log(Level.FINER, "Scheduling next alarm time: {0}, previous alarm time: {1}", new Object[]{possibleNextAlarmTime, this.nextAckDeadlineExtensionAlarmTime});
                if (this.ackDeadlineExtensionAlarm != null) {
                    logger.log(Level.FINER, "Canceling previous alarm");
                    this.ackDeadlineExtensionAlarm.cancel(false);
                }
                this.nextAckDeadlineExtensionAlarmTime = possibleNextAlarmTime;
                this.ackDeadlineExtensionAlarm = this.systemExecutor.schedule(new AckDeadlineAlarm(), this.nextAckDeadlineExtensionAlarmTime.toEpochMilli() - this.clock.millisTime(), TimeUnit.MILLISECONDS);
            }
        }
        finally {
            this.alarmsLock.unlock();
        }
    }

    private void processOutstandingAckOperations() {
        this.processOutstandingAckOperations(Collections.emptyList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processOutstandingAckOperations(List<PendingModifyAckDeadline> ackDeadlineExtensions) {
        ArrayList modifyAckDeadlinesToSend = Lists.newArrayList(ackDeadlineExtensions);
        ArrayList<Object> acksToSend = new ArrayList(this.pendingAcks.size());
        Set<String> set = this.pendingAcks;
        synchronized (set) {
            if (!this.pendingAcks.isEmpty()) {
                try {
                    acksToSend = new ArrayList<String>(this.pendingAcks);
                    logger.log(Level.FINER, "Sending {0} acks", acksToSend.size());
                }
                finally {
                    this.pendingAcks.clear();
                }
            }
        }
        PendingModifyAckDeadline nacksToSend = new PendingModifyAckDeadline(0, new String[0]);
        Set<String> set2 = this.pendingNacks;
        synchronized (set2) {
            if (!this.pendingNacks.isEmpty()) {
                try {
                    for (String ackId : this.pendingNacks) {
                        nacksToSend.addAckId(ackId);
                    }
                    logger.log(Level.FINER, "Sending {0} nacks", this.pendingNacks.size());
                }
                finally {
                    this.pendingNacks.clear();
                }
                modifyAckDeadlinesToSend.add(nacksToSend);
            }
        }
        this.ackProcessor.sendAckOperations(acksToSend, modifyAckDeadlinesToSend);
    }

    private class AckDeadlineAlarm
    implements Runnable {
        private AckDeadlineAlarm() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            MessageDispatcher.this.alarmsLock.lock();
            try {
                MessageDispatcher.this.nextAckDeadlineExtensionAlarmTime = Instant.ofEpochMilli((long)Long.MAX_VALUE);
                MessageDispatcher.this.ackDeadlineExtensionAlarm = null;
                if (MessageDispatcher.this.pendingAcksAlarm != null) {
                    MessageDispatcher.this.pendingAcksAlarm.cancel(false);
                    MessageDispatcher.this.pendingAcksAlarm = null;
                }
            }
            finally {
                MessageDispatcher.this.alarmsLock.unlock();
            }
            Instant now = Instant.ofEpochMilli((long)MessageDispatcher.this.clock.millisTime());
            Instant cutOverTime = Instant.ofEpochMilli((long)((long)Math.ceil((double)now.plus((TemporalAmount)MessageDispatcher.this.ackExpirationPadding).plusMillis(500L).toEpochMilli() / 1000.0) * 1000L));
            logger.log(Level.FINER, "Running alarm sent outstanding acks, at time: {0}, with cutover time: {1}, padding: {2}", new Object[]{now, cutOverTime, MessageDispatcher.this.ackExpirationPadding});
            Instant nextScheduleExpiration = null;
            ArrayList<PendingModifyAckDeadline> modifyAckDeadlinesToSend = new ArrayList<PendingModifyAckDeadline>();
            ArrayList<ExtensionJob> renewJobs = new ArrayList<ExtensionJob>();
            PriorityQueue priorityQueue = MessageDispatcher.this.outstandingAckHandlers;
            synchronized (priorityQueue) {
                while (!MessageDispatcher.this.outstandingAckHandlers.isEmpty() && ((ExtensionJob)((MessageDispatcher)MessageDispatcher.this).outstandingAckHandlers.peek()).expiration.compareTo(cutOverTime) <= 0) {
                    ExtensionJob job = (ExtensionJob)MessageDispatcher.this.outstandingAckHandlers.poll();
                    if (MessageDispatcher.this.maxAckExtensionPeriod.toMillis() > 0L && job.creation.plus((TemporalAmount)MessageDispatcher.this.maxAckExtensionPeriod).compareTo(now) <= 0) continue;
                    int i = 0;
                    while (i < job.ackHandlers.size()) {
                        if (job.ackHandlers.get(i).acked.get()) {
                            Collections.swap(job.ackHandlers, i, job.ackHandlers.size() - 1);
                            job.ackHandlers.remove(job.ackHandlers.size() - 1);
                            continue;
                        }
                        ++i;
                    }
                    if (job.ackHandlers.isEmpty()) continue;
                    job.extendExpiration(now);
                    long extensionMillis = Duration.between((Temporal)now, (Temporal)job.expiration).toMillis();
                    int extensionSeconds = Ints.saturatedCast((long)TimeUnit.MILLISECONDS.toSeconds(extensionMillis));
                    PendingModifyAckDeadline pendingModAckDeadline = new PendingModifyAckDeadline(extensionSeconds, new String[0]);
                    for (AckHandler ackHandler : job.ackHandlers) {
                        pendingModAckDeadline.addAckId(ackHandler.ackId);
                    }
                    modifyAckDeadlinesToSend.add(pendingModAckDeadline);
                    renewJobs.add(job);
                }
                for (ExtensionJob job : renewJobs) {
                    MessageDispatcher.this.outstandingAckHandlers.add(job);
                }
                if (!MessageDispatcher.this.outstandingAckHandlers.isEmpty()) {
                    nextScheduleExpiration = ((ExtensionJob)((MessageDispatcher)MessageDispatcher.this).outstandingAckHandlers.peek()).expiration;
                }
            }
            MessageDispatcher.this.processOutstandingAckOperations(modifyAckDeadlinesToSend);
            if (nextScheduleExpiration != null) {
                logger.log(Level.FINER, "Scheduling based on outstanding, at time: {0}, next scheduled time: {1}", new Object[]{now, nextScheduleExpiration});
                MessageDispatcher.this.setupNextAckDeadlineExtensionAlarm(nextScheduleExpiration);
            }
        }
    }

    static class OutstandingMessagesBatch {
        private final Deque<OutstandingMessage> messages = new LinkedList<OutstandingMessage>();
        private final Runnable doneCallback;

        public OutstandingMessagesBatch(Runnable doneCallback) {
            this.doneCallback = doneCallback;
        }

        public void addMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
            this.messages.add(new OutstandingMessage(receivedMessage, ackHandler));
        }

        public Deque<OutstandingMessage> messages() {
            return this.messages;
        }

        static class OutstandingMessage {
            private final ReceivedMessage receivedMessage;
            private final AckHandler ackHandler;

            public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
                this.receivedMessage = receivedMessage;
                this.ackHandler = ackHandler;
            }

            public ReceivedMessage receivedMessage() {
                return this.receivedMessage;
            }

            public AckHandler ackHandler() {
                return this.ackHandler;
            }
        }
    }

    public static interface AckProcessor {
        public void sendAckOperations(List<String> var1, List<PendingModifyAckDeadline> var2);
    }

    private class AckHandler
    implements FutureCallback<AckReply> {
        private final String ackId;
        private final int outstandingBytes;
        private final AtomicBoolean acked;
        private final Instant receivedTime;

        AckHandler(String ackId, int outstandingBytes) {
            this.ackId = ackId;
            this.outstandingBytes = outstandingBytes;
            this.acked = new AtomicBoolean(false);
            this.receivedTime = Instant.ofEpochMilli((long)MessageDispatcher.this.clock.millisTime());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onFailure(Throwable t) {
            logger.log(Level.WARNING, "MessageReceiver failed to processes ack ID: " + this.ackId + ", the message will be nacked.", t);
            this.acked.getAndSet(true);
            Set set = MessageDispatcher.this.pendingNacks;
            synchronized (set) {
                MessageDispatcher.this.pendingNacks.add(this.ackId);
            }
            MessageDispatcher.this.setupPendingAcksAlarm();
            MessageDispatcher.this.flowController.release(1L, (long)this.outstandingBytes);
            MessageDispatcher.this.messagesWaiter.incrementPendingMessages(-1);
            MessageDispatcher.this.processOutstandingBatches();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onSuccess(AckReply reply) {
            this.acked.getAndSet(true);
            switch (reply) {
                case ACK: {
                    Set set = MessageDispatcher.this.pendingAcks;
                    synchronized (set) {
                        MessageDispatcher.this.pendingAcks.add(this.ackId);
                    }
                    long receivedTimeMillis = TimeUnit.NANOSECONDS.toMillis(this.receivedTime.getNano());
                    MessageDispatcher.this.ackLatencyDistribution.record(Ints.saturatedCast((long)((long)Math.ceil((double)(MessageDispatcher.this.clock.millisTime() - receivedTimeMillis) / 1000.0))));
                    break;
                }
                case NACK: {
                    Set set = MessageDispatcher.this.pendingNacks;
                    synchronized (set) {
                        MessageDispatcher.this.pendingNacks.add(this.ackId);
                        break;
                    }
                }
                default: {
                    throw new IllegalArgumentException(String.format("AckReply: %s not supported", new Object[]{reply}));
                }
            }
            MessageDispatcher.this.setupPendingAcksAlarm();
            MessageDispatcher.this.flowController.release(1L, (long)this.outstandingBytes);
            MessageDispatcher.this.messagesWaiter.incrementPendingMessages(-1);
            MessageDispatcher.this.processOutstandingBatches();
        }
    }

    public static enum AckReply {
        ACK,
        NACK;

    }

    static class PendingModifyAckDeadline {
        final List<String> ackIds = new ArrayList<String>();
        final int deadlineExtensionSeconds;

        PendingModifyAckDeadline(int deadlineExtensionSeconds, String ... ackIds) {
            this.deadlineExtensionSeconds = deadlineExtensionSeconds;
            for (String ackId : ackIds) {
                this.addAckId(ackId);
            }
        }

        public void addAckId(String ackId) {
            this.ackIds.add(ackId);
        }

        public String toString() {
            return String.format("PendingModifyAckDeadline{extension: %d sec, ackIds: %s}", this.deadlineExtensionSeconds, this.ackIds);
        }
    }

    private class ExtensionJob
    implements Comparable<ExtensionJob> {
        Instant creation;
        Instant expiration;
        int nextExtensionSeconds;
        ArrayList<AckHandler> ackHandlers;

        ExtensionJob(Instant creation, Instant expiration, int initialAckDeadlineExtension, ArrayList<AckHandler> ackHandlers) {
            this.creation = creation;
            this.expiration = expiration;
            this.nextExtensionSeconds = initialAckDeadlineExtension;
            this.ackHandlers = ackHandlers;
        }

        void extendExpiration(Instant now) {
            Instant maxExtension;
            Instant possibleExtension = now.plus((TemporalAmount)Duration.ofSeconds((long)this.nextExtensionSeconds));
            this.expiration = possibleExtension.isBefore(maxExtension = this.creation.plus((TemporalAmount)MessageDispatcher.this.maxAckExtensionPeriod)) ? possibleExtension : maxExtension;
            this.nextExtensionSeconds = Math.min(2 * this.nextExtensionSeconds, 600);
        }

        @Override
        public int compareTo(ExtensionJob other) {
            return this.expiration.compareTo(other.expiration);
        }

        public String toString() {
            ArrayList<String> ackIds = new ArrayList<String>();
            for (AckHandler ah : this.ackHandlers) {
                ackIds.add(ah.ackId);
            }
            return String.format("ExtensionJob {expiration: %s, nextExtensionSeconds: %d, ackIds: %s}", this.expiration, this.nextExtensionSeconds, ackIds);
        }
    }
}

