/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.NoOffsetException;
import com.rabbitmq.stream.impl.StreamConsumer;
import com.rabbitmq.stream.impl.StreamConsumerBuilder;
import com.rabbitmq.stream.impl.StreamEnvironment;
import com.rabbitmq.stream.impl.Utils;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class OffsetTrackingCoordinator {
    private static final Logger LOGGER = LoggerFactory.getLogger(OffsetTrackingCoordinator.class);
    private final StreamEnvironment streamEnvironment;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final Collection<Tracker> trackers = ConcurrentHashMap.newKeySet();
    private final LocalClock clock = new LocalClock();
    private final AtomicBoolean flushingOnGoing = new AtomicBoolean(false);
    private final Duration checkInterval;
    private volatile Future<?> checkFuture;

    OffsetTrackingCoordinator(StreamEnvironment streamEnvironment) {
        this(streamEnvironment, Duration.ofSeconds(1L));
    }

    OffsetTrackingCoordinator(StreamEnvironment streamEnvironment, Duration checkInterval) {
        this.streamEnvironment = streamEnvironment;
        this.checkInterval = checkInterval;
    }

    Registration registerTrackingConsumer(StreamConsumer consumer, StreamConsumerBuilder.TrackingConfiguration configuration) {
        Tracker tracker;
        if (!configuration.enabled()) {
            throw new IllegalArgumentException("Tracking must be enabled");
        }
        if (configuration.auto()) {
            tracker = new AutoTrackingTracker(consumer, configuration, this.clock);
        } else {
            if (configuration.manualCheckInterval().isZero()) {
                throw new IllegalArgumentException("There should be no registration if the check interval is 0");
            }
            tracker = new ManualTrackingTracker(consumer, configuration, this.clock);
        }
        this.trackers.add(tracker);
        if (this.started.compareAndSet(false, true)) {
            this.clock.setTime(System.nanoTime());
            this.checkFuture = this.executor().scheduleAtFixedRate(Utils.namedRunnable(() -> {
                if (this.flushingOnGoing.compareAndSet(false, true)) {
                    try {
                        this.clock.setTime(System.nanoTime());
                        LOGGER.debug("Background offset tracking flushing, {} tracker(s) to check", (Object)this.trackers.size());
                        Iterator<Tracker> iterator = this.trackers.iterator();
                        while (iterator.hasNext()) {
                            if (Thread.currentThread().isInterrupted()) {
                                Thread.currentThread().interrupt();
                                break;
                            }
                            Tracker t = iterator.next();
                            if (t.consumer().isOpen()) {
                                try {
                                    t.flushIfNecessary();
                                }
                                catch (Exception e) {
                                    LOGGER.info("Error while flushing tracker: {}", (Object)e.getMessage());
                                }
                                continue;
                            }
                            iterator.remove();
                        }
                    }
                    finally {
                        this.flushingOnGoing.set(false);
                    }
                }
            }, "Offset tracking background task", new Object[0]), this.checkInterval.toMillis(), this.checkInterval.toMillis(), TimeUnit.MILLISECONDS);
        }
        return new Registration(tracker);
    }

    private ScheduledExecutorService executor() {
        return this.streamEnvironment.scheduledExecutorService();
    }

    public boolean needTrackingRegistration(StreamConsumerBuilder.TrackingConfiguration trackingConfiguration) {
        return trackingConfiguration.enabled() && (!trackingConfiguration.manual() || !Duration.ZERO.equals(trackingConfiguration.manualCheckInterval()));
    }

    void close() {
        if (this.checkFuture != null) {
            this.checkFuture.cancel(true);
        }
    }

    public String toString() {
        return "{ \"tracker_count\" : " + this.trackers.size() + " }";
    }

    private static class LocalClock {
        private volatile long time;

        private LocalClock() {
        }

        long time() {
            return this.time;
        }

        public void setTime(long time) {
            this.time = time;
        }
    }

    private static final class ManualTrackingTracker
    implements Tracker {
        private final StreamConsumer consumer;
        private final LocalClock clock;
        private final long checkIntervalInNs;
        private volatile long lastRequestedOffset = 0L;
        private volatile long lastTrackingActivity = 0L;

        private ManualTrackingTracker(StreamConsumer consumer, StreamConsumerBuilder.TrackingConfiguration configuration, LocalClock clock) {
            this.consumer = consumer;
            this.clock = clock;
            this.checkIntervalInNs = configuration.manualCheckInterval().toNanos();
        }

        @Override
        public Consumer<MessageHandler.Context> postProcessingCallback() {
            return null;
        }

        @Override
        public void flushIfNecessary() {
            if (this.clock.time() - this.lastTrackingActivity > this.checkIntervalInNs) {
                try {
                    long lastStoredOffset = this.consumer.storedOffset();
                    if (Utils.offsetBefore(lastStoredOffset, this.lastRequestedOffset)) {
                        this.consumer.store(this.lastRequestedOffset);
                        this.lastTrackingActivity = this.clock.time();
                    }
                }
                catch (NoOffsetException e) {
                    this.consumer.store(this.lastRequestedOffset);
                    this.lastTrackingActivity = this.clock.time();
                }
            }
        }

        @Override
        public long flush() {
            throw new UnsupportedOperationException();
        }

        @Override
        public StreamConsumer consumer() {
            return this.consumer;
        }

        @Override
        public LongConsumer trackingCallback() {
            return requestedOffset -> {
                this.lastRequestedOffset = requestedOffset;
                this.lastTrackingActivity = this.clock.time();
            };
        }

        @Override
        public Runnable closingCallback() {
            return () -> {};
        }
    }

    private static final class AutoTrackingTracker
    implements Tracker {
        private final StreamConsumer consumer;
        private final int messageCountBeforeStorage;
        private final long flushIntervalInNs;
        private final LocalClock clock;
        private volatile long count = 0L;
        private volatile AtomicLong lastProcessedOffset = null;
        private volatile long lastTrackingActivity = 0L;

        private AutoTrackingTracker(StreamConsumer consumer, StreamConsumerBuilder.TrackingConfiguration configuration, LocalClock clock) {
            this.consumer = consumer;
            this.messageCountBeforeStorage = configuration.autoMessageCountBeforeStorage();
            this.flushIntervalInNs = configuration.autoFlushInterval().toNanos();
            this.clock = clock;
        }

        @Override
        public Consumer<MessageHandler.Context> postProcessingCallback() {
            return context -> {
                if (++this.count % (long)this.messageCountBeforeStorage == 0L) {
                    context.storeOffset();
                    this.lastTrackingActivity = this.clock.time();
                }
                if (this.lastProcessedOffset == null) {
                    this.lastProcessedOffset = new AtomicLong(context.offset());
                } else {
                    this.lastProcessedOffset.set(context.offset());
                }
            };
        }

        @Override
        public void flushIfNecessary() {
            if (this.count > 0L && this.clock.time() - this.lastTrackingActivity > this.flushIntervalInNs) {
                this.flush();
            }
        }

        @Override
        public long flush() {
            long result;
            if (this.lastProcessedOffset == null) {
                return 0L;
            }
            try {
                long lastStoredOffset = this.consumer.storedOffset();
                if (Utils.offsetBefore(lastStoredOffset, this.lastProcessedOffset.get())) {
                    this.consumer.store(this.lastProcessedOffset.get());
                }
                result = this.lastProcessedOffset.get();
            }
            catch (NoOffsetException e) {
                this.consumer.store(this.lastProcessedOffset.get());
                result = this.lastProcessedOffset.get();
            }
            this.lastTrackingActivity = this.clock.time();
            return result;
        }

        @Override
        public StreamConsumer consumer() {
            return this.consumer;
        }

        @Override
        public LongConsumer trackingCallback() {
            return Utils.NO_OP_LONG_CONSUMER;
        }

        @Override
        public Runnable closingCallback() {
            return () -> {
                if (this.consumer.isSac() && !this.consumer.sacActive()) {
                    LOGGER.debug("Not storing offset on closing because consumer is a non-active SAC");
                } else if (this.lastProcessedOffset == null) {
                    LOGGER.debug("Not storing anything as nothing has been processed.");
                } else {
                    Runnable storageOperation = () -> {
                        this.consumer.store(this.lastProcessedOffset.get());
                        if (this.consumer.isSac()) {
                            LOGGER.debug("Consumer is SAC, making sure offset has been stored, in case another SAC takes over");
                            this.consumer.waitForOffsetToBeStored(this.lastProcessedOffset.get());
                        }
                    };
                    try {
                        long lastStoredOffset = this.consumer.storedOffset();
                        if (Utils.offsetBefore(lastStoredOffset, this.lastProcessedOffset.get())) {
                            LOGGER.debug("Storing {} offset before closing", (Object)this.lastProcessedOffset);
                            storageOperation.run();
                        }
                    }
                    catch (NoOffsetException e) {
                        LOGGER.debug("Nothing stored yet, storing {} offset before closing", (Object)this.lastProcessedOffset);
                        storageOperation.run();
                    }
                }
            };
        }
    }

    static class Registration {
        private final Tracker tracker;

        Registration(Tracker tracker) {
            this.tracker = tracker;
        }

        Consumer<MessageHandler.Context> postMessageProcessingCallback() {
            return this.tracker.postProcessingCallback();
        }

        LongConsumer trackingCallback() {
            return this.tracker.trackingCallback();
        }

        Runnable closingCallback() {
            return this.tracker.closingCallback();
        }

        long flush() {
            return this.tracker.flush();
        }
    }

    private static interface Tracker {
        public Consumer<MessageHandler.Context> postProcessingCallback();

        public void flushIfNecessary();

        public long flush();

        public StreamConsumer consumer();

        public LongConsumer trackingCallback();

        public Runnable closingCallback();
    }
}

