/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.cloudpubsub.internal;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.internal.AckSetTracker;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class AckSetTrackerImpl
extends TrivialProxyService
implements AckSetTracker {
    private final CloseableMonitor monitor = new CloseableMonitor();
    @GuardedBy(value="monitor.monitor")
    private final Committer committer;
    @GuardedBy(value="monitor.monitor")
    private final Deque<Offset> receipts = new ArrayDeque<Offset>();
    @GuardedBy(value="monitor.monitor")
    private final PriorityQueue<Offset> acks = new PriorityQueue();

    public AckSetTrackerImpl(Committer committer) throws ApiException {
        super(committer);
        this.committer = committer;
    }

    @Override
    public Runnable track(SequencedMessage message) throws CheckedApiException {
        final Offset messageOffset = message.offset();
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            CheckedApiPreconditions.checkArgument(this.receipts.isEmpty() || this.receipts.peekLast().value() < messageOffset.value());
            this.receipts.addLast(messageOffset);
        }
        return new Runnable(){
            private final AtomicBoolean wasAcked = new AtomicBoolean(false);

            @Override
            public void run() {
                if (this.wasAcked.getAndSet(true)) {
                    CheckedApiException e = new CheckedApiException("Duplicate acks are not allowed.", StatusCode.Code.FAILED_PRECONDITION);
                    AckSetTrackerImpl.this.onPermanentError(e);
                    throw e.underlying;
                }
                AckSetTrackerImpl.this.onAck(messageOffset);
            }
        };
    }

    private void onAck(Offset offset) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.acks.add(offset);
            Optional<Object> prefixAckedOffset = Optional.empty();
            while (!this.receipts.isEmpty() && !this.acks.isEmpty() && this.receipts.peekFirst().value() == this.acks.peek().value()) {
                prefixAckedOffset = Optional.of(this.acks.remove());
                this.receipts.removeFirst();
            }
            if (prefixAckedOffset.isPresent()) {
                ApiFuture<Void> future = this.committer.commitOffset(Offset.of(((Offset)prefixAckedOffset.get()).value() + 1L));
                ExtractStatus.addFailureHandler(future, this::onPermanentError);
            }
        }
    }
}

