/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.subscription.broker;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue;
import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTabletQueue;
import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
import org.apache.iotdb.db.subscription.broker.SubscriptionStates;
import org.apache.iotdb.db.subscription.broker.TsFileDeduplicationBlockingPendingQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetrics;
import org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriptionBroker {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionBroker.class);
    private final String brokerId;
    private final Map<String, SubscriptionPrefetchingQueue> topicNameToPrefetchingQueue;
    private final Map<String, String> completedTopicNames;
    private final Map<String, AtomicLong> topicNameToCommitIdGenerator;
    private final LoadingCache<String, SubscriptionStates> consumerIdToSubscriptionStates;

    public SubscriptionBroker(String brokerId) {
        this.brokerId = brokerId;
        this.topicNameToPrefetchingQueue = new ConcurrentHashMap<String, SubscriptionPrefetchingQueue>();
        this.completedTopicNames = new ConcurrentHashMap<String, String>();
        this.topicNameToCommitIdGenerator = new ConcurrentHashMap<String, AtomicLong>();
        this.consumerIdToSubscriptionStates = Caffeine.newBuilder().expireAfterAccess(60L, TimeUnit.SECONDS).build(consumerId -> new SubscriptionStates());
    }

    public boolean isEmpty() {
        return this.topicNameToPrefetchingQueue.isEmpty() && this.completedTopicNames.isEmpty() && this.topicNameToCommitIdGenerator.isEmpty();
    }

    public List<SubscriptionEvent> poll(String consumerId, Set<String> topicNames, long maxBytes) {
        ArrayList<SubscriptionEvent> eventsToPoll = new ArrayList<SubscriptionEvent>();
        Set<String> candidateTopicNames = this.prepareCandidateTopicNames(topicNames, eventsToPoll);
        ArrayList<String> sortedTopicNames = new ArrayList<String>(candidateTopicNames);
        sortedTopicNames.sort(Comparator.comparingLong(topicName -> Objects.requireNonNull((SubscriptionStates)this.consumerIdToSubscriptionStates.get((Object)consumerId)).getStates((String)topicName)));
        ArrayList<SubscriptionEvent> eventsToNack = new ArrayList<SubscriptionEvent>();
        long totalSize = 0L;
        HashMap<String, Long> topicNameToIncrements = new HashMap<String, Long>();
        for (String topicName2 : sortedTopicNames) {
            long currentSize;
            SubscriptionEvent event;
            SubscriptionPrefetchingQueue prefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName2);
            if (Objects.isNull(prefetchingQueue) || prefetchingQueue.isClosed() || Objects.isNull(event = prefetchingQueue.poll(consumerId))) continue;
            try {
                currentSize = event.getCurrentResponseSize();
            }
            catch (IOException e) {
                eventsToNack.add(event);
                continue;
            }
            eventsToPoll.add(event);
            topicNameToIncrements.merge(event.getCommitContext().getTopicName(), 1L, Long::sum);
            if ((totalSize += currentSize) + currentSize <= maxBytes) continue;
            break;
        }
        Objects.requireNonNull((SubscriptionStates)this.consumerIdToSubscriptionStates.get((Object)consumerId)).updateStates(topicNameToIncrements);
        this.commit(consumerId, eventsToNack.stream().map(SubscriptionEvent::getCommitContext).collect(Collectors.toList()), true);
        return eventsToPoll;
    }

    private Set<String> prepareCandidateTopicNames(Set<String> topicNames, List<SubscriptionEvent> eventsToPoll) {
        HashSet<String> candidateTopicNames = new HashSet<String>();
        for (String topicName : topicNames) {
            SubscriptionPrefetchingQueue prefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName);
            if (Objects.isNull(prefetchingQueue)) {
                if (!this.completedTopicNames.containsKey(topicName)) continue;
                LOGGER.info("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is completed, return termination response to client", (Object)topicName, (Object)this.brokerId);
                eventsToPoll.add(new SubscriptionEvent(SubscriptionPollResponseType.TERMINATION.getType(), (SubscriptionPollPayload)new TerminationPayload(), new SubscriptionCommitContext(IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), PipeDataNodeAgent.runtime().getRebootTimes(), topicName, this.brokerId, -1L)));
                continue;
            }
            if (prefetchingQueue.isClosed()) {
                LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", (Object)topicName, (Object)this.brokerId);
                continue;
            }
            candidateTopicNames.add(topicName);
        }
        return candidateTopicNames;
    }

    public List<SubscriptionEvent> pollTsFile(String consumerId, SubscriptionCommitContext commitContext, long writingOffset) {
        String topicName = commitContext.getTopicName();
        SubscriptionPrefetchingQueue prefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName);
        if (Objects.isNull(prefetchingQueue)) {
            String errorMessage = String.format("Subscription: prefetching queue bound to topic [%s] for consumer group [%s] does not exist", topicName, this.brokerId);
            LOGGER.warn(errorMessage);
            throw new SubscriptionException(errorMessage);
        }
        if (!(prefetchingQueue instanceof SubscriptionPrefetchingTsFileQueue)) {
            String errorMessage = String.format("Subscription: prefetching queue bound to topic [%s] for consumer group [%s] is invalid", topicName, this.brokerId);
            LOGGER.warn(errorMessage);
            throw new SubscriptionException(errorMessage);
        }
        if (prefetchingQueue.isClosed()) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", (Object)topicName, (Object)this.brokerId);
            return Collections.emptyList();
        }
        SubscriptionEvent event = ((SubscriptionPrefetchingTsFileQueue)prefetchingQueue).pollTsFile(consumerId, commitContext, writingOffset);
        if (Objects.nonNull(event)) {
            return Collections.singletonList(event);
        }
        return Collections.emptyList();
    }

    public List<SubscriptionEvent> pollTablets(String consumerId, SubscriptionCommitContext commitContext, int offset) {
        String topicName = commitContext.getTopicName();
        SubscriptionPrefetchingQueue prefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName);
        if (Objects.isNull(prefetchingQueue)) {
            String errorMessage = String.format("Subscription: prefetching queue bound to topic [%s] for consumer group [%s] does not exist", topicName, this.brokerId);
            LOGGER.warn(errorMessage);
            throw new SubscriptionException(errorMessage);
        }
        if (!(prefetchingQueue instanceof SubscriptionPrefetchingTabletQueue)) {
            String errorMessage = String.format("Subscription: prefetching queue bound to topic [%s] for consumer group [%s] is invalid", topicName, this.brokerId);
            LOGGER.warn(errorMessage);
            throw new SubscriptionException(errorMessage);
        }
        if (prefetchingQueue.isClosed()) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", (Object)topicName, (Object)this.brokerId);
            return Collections.emptyList();
        }
        SubscriptionEvent event = ((SubscriptionPrefetchingTabletQueue)prefetchingQueue).pollTablets(consumerId, commitContext, offset);
        if (Objects.nonNull(event)) {
            return Collections.singletonList(event);
        }
        return Collections.emptyList();
    }

    public List<SubscriptionCommitContext> commit(String consumerId, List<SubscriptionCommitContext> commitContexts, boolean nack) {
        ArrayList<SubscriptionCommitContext> successfulCommitContexts = new ArrayList<SubscriptionCommitContext>();
        for (SubscriptionCommitContext commitContext : commitContexts) {
            String topicName = commitContext.getTopicName();
            SubscriptionPrefetchingQueue prefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName);
            if (Objects.isNull(prefetchingQueue)) {
                LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist", (Object)topicName, (Object)this.brokerId);
                continue;
            }
            if (prefetchingQueue.isClosed()) {
                LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", (Object)topicName, (Object)this.brokerId);
                continue;
            }
            if (!nack) {
                if (!prefetchingQueue.ack(consumerId, commitContext)) continue;
                successfulCommitContexts.add(commitContext);
                continue;
            }
            if (!prefetchingQueue.nack(consumerId, commitContext)) continue;
            successfulCommitContexts.add(commitContext);
        }
        return successfulCommitContexts;
    }

    public boolean isCommitContextOutdated(SubscriptionCommitContext commitContext) {
        String topicName = commitContext.getTopicName();
        SubscriptionPrefetchingQueue prefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName);
        if (Objects.isNull(prefetchingQueue)) {
            return true;
        }
        return prefetchingQueue.isCommitContextOutdated(commitContext);
    }

    public void bindPrefetchingQueue(String topicName, UnboundedBlockingPendingQueue<Event> inputPendingQueue) {
        if (Objects.nonNull(this.topicNameToPrefetchingQueue.get(topicName))) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] has already existed", (Object)topicName, (Object)this.brokerId);
            return;
        }
        String topicFormat = SubscriptionAgent.topic().getTopicFormat(topicName);
        SubscriptionPrefetchingQueue prefetchingQueue = "TsFileHandler".equals(topicFormat) ? new SubscriptionPrefetchingTsFileQueue(this.brokerId, topicName, new TsFileDeduplicationBlockingPendingQueue(inputPendingQueue), this.topicNameToCommitIdGenerator.computeIfAbsent(topicName, key -> new AtomicLong())) : new SubscriptionPrefetchingTabletQueue(this.brokerId, topicName, new TsFileDeduplicationBlockingPendingQueue(inputPendingQueue), this.topicNameToCommitIdGenerator.computeIfAbsent(topicName, key -> new AtomicLong()));
        SubscriptionPrefetchingQueueMetrics.getInstance().register(prefetchingQueue);
        this.topicNameToPrefetchingQueue.put(topicName, prefetchingQueue);
        LOGGER.info("Subscription: create prefetching queue bound to topic [{}] for consumer group [{}]", (Object)topicName, (Object)this.brokerId);
    }

    public void unbindPrefetchingQueue(String topicName) {
        SubscriptionPrefetchingQueue prefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName);
        if (Objects.isNull(prefetchingQueue)) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist", (Object)topicName, (Object)this.brokerId);
            return;
        }
        prefetchingQueue.markClosed();
        if (SubscriptionAgent.topic().getTopicMode(topicName).equals("snapshot") && prefetchingQueue.isCompleted()) {
            this.completedTopicNames.put(topicName, topicName);
        }
        prefetchingQueue.cleanUp();
        SubscriptionPrefetchingQueueMetrics.getInstance().deregister(prefetchingQueue.getPrefetchingQueueId());
        this.topicNameToPrefetchingQueue.remove(topicName);
        LOGGER.info("Subscription: drop prefetching queue bound to topic [{}] for consumer group [{}]", (Object)topicName, (Object)this.brokerId);
    }

    public void removePrefetchingQueue(String topicName) {
        SubscriptionPrefetchingQueue prefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName);
        if (Objects.nonNull(prefetchingQueue)) {
            LOGGER.info("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] still exists, unbind it before closing", (Object)topicName, (Object)this.brokerId);
            this.unbindPrefetchingQueue(topicName);
        }
        this.completedTopicNames.remove(topicName);
        this.topicNameToCommitIdGenerator.remove(topicName);
    }

    public boolean executePrefetch(String topicName) {
        SubscriptionPrefetchingQueue prefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName);
        if (Objects.isNull(prefetchingQueue)) {
            SubscriptionDataNodeResourceManager.log().schedule(SubscriptionBroker.class, this.brokerId, topicName).ifPresent(l -> l.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist", (Object)topicName, (Object)this.brokerId));
            return false;
        }
        if (prefetchingQueue.isClosed()) {
            SubscriptionDataNodeResourceManager.log().schedule(SubscriptionBroker.class, this.brokerId, topicName).ifPresent(l -> l.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", (Object)topicName, (Object)this.brokerId));
            return false;
        }
        return prefetchingQueue.executePrefetch();
    }

    public int getPipeEventCount(String topicName) {
        SubscriptionPrefetchingQueue prefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName);
        if (Objects.isNull(prefetchingQueue)) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist", (Object)topicName, (Object)this.brokerId);
            return 0;
        }
        if (prefetchingQueue.isClosed()) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", (Object)topicName, (Object)this.brokerId);
            return 0;
        }
        return prefetchingQueue.getPipeEventCount();
    }

    public int getPrefetchingQueueCount() {
        return this.topicNameToPrefetchingQueue.size();
    }
}

