package io.moquette.broker;

import io.moquette.broker.Session;
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.moquette.interception.BrokerInterceptor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.util.ReferenceCountUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/moquette/broker/PostOffice.class */
public class PostOffice {
    private static final Logger LOG = LoggerFactory.getLogger(PostOffice.class);
    private static final Set<String> NO_FILTER = new HashSet();
    private final Authorizator authorizator;
    private final ISubscriptionsDirectory subscriptions;
    private final IRetainedRepository retainedRepository;
    private SessionRegistry sessionRegistry;
    private BrokerInterceptor interceptor;
    private final FailedPublishCollection failedPublishes = new FailedPublishCollection();
    private final SessionEventLoopGroup sessionLoops;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/moquette/broker/PostOffice$BatchingPublishesCollector.class */
    public class BatchingPublishesCollector {
        final List<Subscription>[] subscriptions;
        private final int eventLoops;
        private final SessionEventLoopGroup loopGroup;

        BatchingPublishesCollector(SessionEventLoopGroup sessionEventLoopGroup) {
            this.eventLoops = sessionEventLoopGroup.getEventLoopCount();
            this.loopGroup = sessionEventLoopGroup;
            this.subscriptions = new List[this.eventLoops];
        }

        public void add(Subscription subscription) {
            int subscriberEventLoop = subscriberEventLoop(subscription.getClientId());
            if (this.subscriptions[subscriberEventLoop] == null) {
                this.subscriptions[subscriberEventLoop] = new ArrayList();
            }
            this.subscriptions[subscriberEventLoop].add(subscription);
        }

        private int subscriberEventLoop(String str) {
            return this.loopGroup.targetQueueOrdinal(str);
        }

        List<RouteResult> routeBatchedPublishes(Consumer<List<Subscription>> consumer) {
            ArrayList arrayList = new ArrayList(this.eventLoops);
            for (List<Subscription> list : this.subscriptions) {
                if (list != null) {
                    String clientId = list.get(0).getClientId();
                    if (PostOffice.LOG.isTraceEnabled()) {
                        PostOffice.LOG.trace("Routing PUBLISH to eventLoop {}  for subscriptions [{}]", Integer.valueOf(subscriberEventLoop(clientId)), (String) list.stream().map((v0) -> {
                            return v0.toString();
                        }).collect(Collectors.joining(",\n")));
                    }
                    arrayList.add(PostOffice.this.routeCommand(clientId, "batched PUB", () -> {
                        consumer.accept(list);
                        return null;
                    }));
                }
            }
            return arrayList;
        }

        Collection<String> subscriberIdsByEventLoop(String str) {
            return (Collection) this.subscriptions[subscriberEventLoop(str)].stream().map((v0) -> {
                return v0.getClientId();
            }).collect(Collectors.toList());
        }

        public int countBatches() {
            int i = 0;
            for (List<Subscription> list : this.subscriptions) {
                if (list != null) {
                    i++;
                }
            }
            return i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/moquette/broker/PostOffice$FailedPublishCollection.class */
    public static class FailedPublishCollection {
        private final ConcurrentMap<PacketId, Set<String>> packetsMap;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/moquette/broker/PostOffice$FailedPublishCollection$PacketId.class */
        public static class PacketId {
            private final String clientId;
            private final int idPacket;

            PacketId(String str, int i) {
                this.clientId = str;
                this.idPacket = i;
            }

            public boolean equals(Object obj) {
                if (this == obj) {
                    return true;
                }
                if (obj == null || getClass() != obj.getClass()) {
                    return false;
                }
                PacketId packetId = (PacketId) obj;
                return this.idPacket == packetId.idPacket && Objects.equals(this.clientId, packetId.clientId);
            }

            public int hashCode() {
                return Objects.hash(this.clientId, Integer.valueOf(this.idPacket));
            }

            public boolean belongToClient(String str) {
                return this.clientId.equals(str);
            }
        }

        private FailedPublishCollection() {
            this.packetsMap = new ConcurrentHashMap();
        }

        private void insert(String str, int i, String str2) {
            this.packetsMap.computeIfAbsent(new PacketId(str, i), packetId -> {
                return new HashSet();
            }).add(str2);
        }

        public void remove(String str, int i, String str2) {
            this.packetsMap.computeIfPresent(new PacketId(str, i), (packetId, set) -> {
                set.remove(str2);
                if (set.isEmpty()) {
                    return null;
                }
                return set;
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void removeAll(int i, String str, Collection<String> collection) {
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                remove(str, i, it.next());
            }
        }

        void cleanupForClient(String str) {
            Stream<PacketId> filter = this.packetsMap.keySet().stream().filter(packetId -> {
                return packetId.belongToClient(str);
            });
            ConcurrentMap<PacketId, Set<String>> concurrentMap = this.packetsMap;
            Objects.requireNonNull(concurrentMap);
            filter.forEach((v1) -> {
                r1.remove(v1);
            });
        }

        void insertAll(int i, String str, Collection<String> collection) {
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                insert(str, i, it.next());
            }
        }

        Set<String> listFailed(String str, int i) {
            return this.packetsMap.getOrDefault(new PacketId(str, i), Collections.emptySet());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/moquette/broker/PostOffice$RouteResult.class */
    public static class RouteResult {
        private final String clientId;
        private final Status status;
        private CompletableFuture queuedFuture;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/moquette/broker/PostOffice$RouteResult$Status.class */
        public enum Status {
            SUCCESS,
            FAIL
        }

        public static RouteResult success(String str, CompletableFuture completableFuture) {
            return new RouteResult(str, Status.SUCCESS, completableFuture);
        }

        public static RouteResult failed(String str) {
            return failed(str, null);
        }

        public static RouteResult failed(String str, String str2) {
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.completeExceptionally(new Error(str2));
            return new RouteResult(str, Status.FAIL, completableFuture);
        }

        private RouteResult(String str, Status status, CompletableFuture completableFuture) {
            this.clientId = str;
            this.status = status;
            this.queuedFuture = completableFuture;
        }

        public CompletableFuture completableFuture() {
            if (this.status == Status.FAIL) {
                throw new IllegalArgumentException("Accessing completable future on a failed result");
            }
            return this.queuedFuture;
        }

        public boolean isSuccess() {
            return this.status == Status.SUCCESS;
        }

        public RouteResult ifFailed(Runnable runnable) {
            if (!isSuccess()) {
                runnable.run();
            }
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PostOffice(ISubscriptionsDirectory iSubscriptionsDirectory, IRetainedRepository iRetainedRepository, SessionRegistry sessionRegistry, BrokerInterceptor brokerInterceptor, Authorizator authorizator, SessionEventLoopGroup sessionEventLoopGroup) {
        this.authorizator = authorizator;
        this.subscriptions = iSubscriptionsDirectory;
        this.retainedRepository = iRetainedRepository;
        this.sessionRegistry = sessionRegistry;
        this.interceptor = brokerInterceptor;
        this.sessionLoops = sessionEventLoopGroup;
    }

    public void init(SessionRegistry sessionRegistry) {
        this.sessionRegistry = sessionRegistry;
    }

    public void fireWill(Session.Will will) {
        publish2Subscribers(will.payload, new Topic(will.topic), will.qos);
    }

    public void subscribeClientToTopics(MqttSubscribeMessage mqttSubscribeMessage, String str, String str2, MQTTConnection mQTTConnection) {
        int messageId = Utils.messageId(mqttSubscribeMessage);
        List<MqttTopicSubscription> verifyTopicsReadAccess = this.authorizator.verifyTopicsReadAccess(str, str2, mqttSubscribeMessage);
        MqttSubAckMessage doAckMessageFromValidateFilters = doAckMessageFromValidateFilters(verifyTopicsReadAccess, messageId);
        List<Subscription> list = (List) verifyTopicsReadAccess.stream().filter(mqttTopicSubscription -> {
            return mqttTopicSubscription.qualityOfService() != MqttQoS.FAILURE;
        }).map(mqttTopicSubscription2 -> {
            return new Subscription(str, new Topic(mqttTopicSubscription2.topicName()), mqttTopicSubscription2.qualityOfService());
        }).collect(Collectors.toList());
        Iterator<Subscription> it = list.iterator();
        while (it.hasNext()) {
            this.subscriptions.add(it.next());
        }
        this.sessionRegistry.retrieve(str).addSubscriptions(list);
        mQTTConnection.sendSubAckMessage(messageId, doAckMessageFromValidateFilters);
        publishRetainedMessagesForSubscriptions(str, list);
        Iterator<Subscription> it2 = list.iterator();
        while (it2.hasNext()) {
            this.interceptor.notifyTopicSubscribed(it2.next(), str2);
        }
    }

    private void publishRetainedMessagesForSubscriptions(String str, List<Subscription> list) {
        Session retrieve = this.sessionRegistry.retrieve(str);
        for (Subscription subscription : list) {
            List<RetainedMessage> retainedOnTopic = this.retainedRepository.retainedOnTopic(subscription.getTopicFilter().toString());
            if (!retainedOnTopic.isEmpty()) {
                for (RetainedMessage retainedMessage : retainedOnTopic) {
                    MqttQoS lowerQosToTheSubscriptionDesired = lowerQosToTheSubscriptionDesired(subscription, retainedMessage.qosLevel());
                    ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(retainedMessage.getPayload());
                    retrieve.sendRetainedPublishOnSessionAtQos(retainedMessage.getTopic(), lowerQosToTheSubscriptionDesired, wrappedBuffer);
                    wrappedBuffer.release();
                }
            }
        }
    }

    private MqttSubAckMessage doAckMessageFromValidateFilters(List<MqttTopicSubscription> list, int i) {
        ArrayList arrayList = new ArrayList();
        Iterator<MqttTopicSubscription> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(Integer.valueOf(it.next().qualityOfService().value()));
        }
        return new MqttSubAckMessage(new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(i), new MqttSubAckPayload(arrayList));
    }

    public void unsubscribe(List<String> list, MQTTConnection mQTTConnection, int i) {
        String clientId = mQTTConnection.getClientId();
        Session retrieve = this.sessionRegistry.retrieve(clientId);
        if (retrieve == null) {
            LOG.warn("Session not found when unsubscribing {}", clientId);
            mQTTConnection.sendUnsubAckMessage(list, clientId, i);
            return;
        }
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            Topic topic = new Topic(it.next());
            if (!topic.isValid()) {
                mQTTConnection.dropConnection();
                LOG.warn("Topic filter is not valid. topics: {}, offending topic filter: {}", list, topic);
                return;
            } else {
                LOG.trace("Removing subscription topic={}", topic);
                this.subscriptions.removeSubscription(topic, clientId);
                retrieve.removeSubscription(topic);
                this.interceptor.notifyTopicUnsubscribed(topic.toString(), clientId, NettyUtils.userName(mQTTConnection.channel));
            }
        }
        mQTTConnection.sendUnsubAckMessage(list, clientId, i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> receivedPublishQos0(Topic topic, String str, String str2, MqttPublishMessage mqttPublishMessage) {
        if (!this.authorizator.canWrite(topic, str, str2)) {
            LOG.error("client is not authorized to publish on topic: {}", topic);
            ReferenceCountUtil.release(mqttPublishMessage);
            return CompletableFuture.completedFuture(null);
        }
        RoutingResults publish2Subscribers = publish2Subscribers(mqttPublishMessage.payload(), topic, MqttQoS.AT_MOST_ONCE);
        if (!publish2Subscribers.isAllFailed()) {
            return publish2Subscribers.completableFuture().thenRun(() -> {
                if (mqttPublishMessage.fixedHeader().isRetain()) {
                    this.retainedRepository.cleanRetained(topic);
                }
                this.interceptor.notifyTopicPublished(mqttPublishMessage, str2, str);
                ReferenceCountUtil.release(mqttPublishMessage);
            });
        }
        LOG.info("No one publish was successfully enqueued to session loops");
        ReferenceCountUtil.release(mqttPublishMessage);
        return CompletableFuture.completedFuture(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RoutingResults receivedPublishQos1(MQTTConnection mQTTConnection, Topic topic, String str, int i, MqttPublishMessage mqttPublishMessage) {
        RoutingResults publish2Subscribers;
        topic.getTokens();
        if (!topic.isValid()) {
            LOG.warn("Invalid topic format, force close the connection");
            mQTTConnection.dropConnection();
            ReferenceCountUtil.release(mqttPublishMessage);
            return RoutingResults.preroutingError();
        }
        String clientId = mQTTConnection.getClientId();
        if (!this.authorizator.canWrite(topic, str, clientId)) {
            LOG.error("MQTT client: {} is not authorized to publish on topic: {}", clientId, topic);
            ReferenceCountUtil.release(mqttPublishMessage);
            return RoutingResults.preroutingError();
        }
        ByteBuf payload = mqttPublishMessage.payload();
        if (mqttPublishMessage.fixedHeader().isDup()) {
            publish2Subscribers = publish2Subscribers(payload, topic, MqttQoS.AT_LEAST_ONCE, this.failedPublishes.listFailed(clientId, i));
        } else {
            publish2Subscribers = publish2Subscribers(payload, topic, MqttQoS.AT_LEAST_ONCE);
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("subscriber routes: {}", publish2Subscribers);
        }
        if (publish2Subscribers.isAllSuccess()) {
            mQTTConnection.sendPubAck(i);
            manageRetain(topic, mqttPublishMessage);
            this.interceptor.notifyTopicPublished(mqttPublishMessage, clientId, str);
        } else {
            this.failedPublishes.insertAll(i, clientId, publish2Subscribers.failedRoutings);
        }
        ReferenceCountUtil.release(mqttPublishMessage);
        this.failedPublishes.removeAll(i, clientId, publish2Subscribers.successedRoutings);
        return publish2Subscribers;
    }

    private void manageRetain(Topic topic, MqttPublishMessage mqttPublishMessage) {
        if (mqttPublishMessage.fixedHeader().isRetain()) {
            if (mqttPublishMessage.payload().isReadable()) {
                this.retainedRepository.retain(topic, mqttPublishMessage);
            } else {
                this.retainedRepository.cleanRetained(topic);
            }
        }
    }

    private RoutingResults publish2Subscribers(ByteBuf byteBuf, Topic topic, MqttQoS mqttQoS) {
        return publish2Subscribers(byteBuf, topic, mqttQoS, NO_FILTER);
    }

    private RoutingResults publish2Subscribers(ByteBuf byteBuf, Topic topic, MqttQoS mqttQoS, Set<String> set) {
        List<Subscription> matchQosSharpening = this.subscriptions.matchQosSharpening(topic);
        if (matchQosSharpening.isEmpty()) {
            LOG.trace("No matching subscriptions for topic: {}", topic);
            return new RoutingResults(Collections.emptyList(), Collections.emptyList(), CompletableFuture.completedFuture(null));
        }
        BatchingPublishesCollector batchingPublishesCollector = new BatchingPublishesCollector(this.sessionLoops);
        for (Subscription subscription : matchQosSharpening) {
            if (set == NO_FILTER || set.contains(subscription.getClientId())) {
                batchingPublishesCollector.add(subscription);
            }
        }
        byteBuf.retain(batchingPublishesCollector.countBatches());
        List<RouteResult> routeBatchedPublishes = batchingPublishesCollector.routeBatchedPublishes(list -> {
            publishToSession(byteBuf, topic, list, mqttQoS);
            byteBuf.release();
        });
        CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) routeBatchedPublishes.stream().filter((v0) -> {
            return v0.isSuccess();
        }).map((v0) -> {
            return v0.completableFuture();
        }).toArray(i -> {
            return new CompletableFuture[i];
        }));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (RouteResult routeResult : routeBatchedPublishes) {
            Collection<String> subscriberIdsByEventLoop = batchingPublishesCollector.subscriberIdsByEventLoop(routeResult.clientId);
            if (routeResult.status == RouteResult.Status.FAIL) {
                arrayList.addAll(subscriberIdsByEventLoop);
                byteBuf.release();
            } else {
                arrayList2.addAll(subscriberIdsByEventLoop);
            }
        }
        return new RoutingResults(arrayList2, arrayList, allOf);
    }

    private void publishToSession(ByteBuf byteBuf, Topic topic, Collection<Subscription> collection, MqttQoS mqttQoS) {
        ByteBuf duplicate = byteBuf.duplicate();
        for (Subscription subscription : collection) {
            publishToSession(duplicate, topic, subscription, lowerQosToTheSubscriptionDesired(subscription, mqttQoS));
        }
    }

    private void publishToSession(ByteBuf byteBuf, Topic topic, Subscription subscription, MqttQoS mqttQoS) {
        Session retrieve = this.sessionRegistry.retrieve(subscription.getClientId());
        if (!(retrieve != null)) {
            LOG.debug("PUBLISH to not yet present session. CId: {}, topicFilter: {}, qos: {}", new Object[]{subscription.getClientId(), subscription.getTopicFilter(), mqttQoS});
        } else {
            LOG.debug("Sending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {}", new Object[]{subscription.getClientId(), subscription.getTopicFilter(), mqttQoS});
            retrieve.sendNotRetainedPublishOnSessionAtQos(topic, mqttQoS, byteBuf);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RoutingResults receivedPublishQos2(MQTTConnection mQTTConnection, MqttPublishMessage mqttPublishMessage, String str) {
        RoutingResults publish2Subscribers;
        LOG.trace("Processing PUB QoS2 message on connection: {}", mQTTConnection);
        Topic topic = new Topic(mqttPublishMessage.variableHeader().topicName());
        ByteBuf payload = mqttPublishMessage.payload();
        String clientId = mQTTConnection.getClientId();
        if (!this.authorizator.canWrite(topic, str, clientId)) {
            LOG.error("MQTT client is not authorized to publish on topic: {}", topic);
            ReferenceCountUtil.release(mqttPublishMessage);
            return RoutingResults.preroutingError();
        }
        int packetId = mqttPublishMessage.variableHeader().packetId();
        if (mqttPublishMessage.fixedHeader().isDup()) {
            publish2Subscribers = publish2Subscribers(payload, topic, MqttQoS.EXACTLY_ONCE, this.failedPublishes.listFailed(clientId, packetId));
        } else {
            publish2Subscribers = publish2Subscribers(payload, topic, MqttQoS.EXACTLY_ONCE);
        }
        if (publish2Subscribers.isAllSuccess()) {
            mQTTConnection.sendPubRec(packetId);
            manageRetain(topic, mqttPublishMessage);
            this.interceptor.notifyTopicPublished(mqttPublishMessage, clientId, str);
        } else {
            this.failedPublishes.insertAll(packetId, clientId, publish2Subscribers.failedRoutings);
        }
        ReferenceCountUtil.release(mqttPublishMessage);
        this.failedPublishes.removeAll(packetId, clientId, publish2Subscribers.successedRoutings);
        return publish2Subscribers;
    }

    static MqttQoS lowerQosToTheSubscriptionDesired(Subscription subscription, MqttQoS mqttQoS) {
        if (mqttQoS.value() > subscription.getRequestedQos().value()) {
            mqttQoS = subscription.getRequestedQos();
        }
        return mqttQoS;
    }

    public RoutingResults internalPublish(MqttPublishMessage mqttPublishMessage) {
        MqttQoS qosLevel = mqttPublishMessage.fixedHeader().qosLevel();
        Topic topic = new Topic(mqttPublishMessage.variableHeader().topicName());
        ByteBuf payload = mqttPublishMessage.payload();
        LOG.info("Sending internal PUBLISH message Topic={}, qos={}", topic, qosLevel);
        RoutingResults publish2Subscribers = publish2Subscribers(payload, topic, qosLevel);
        LOG.trace("after routed publishes: {}", publish2Subscribers);
        if (!mqttPublishMessage.fixedHeader().isRetain()) {
            return publish2Subscribers;
        }
        if (qosLevel == MqttQoS.AT_MOST_ONCE || payload.readableBytes() == 0) {
            this.retainedRepository.cleanRetained(topic);
            return publish2Subscribers;
        }
        this.retainedRepository.retain(topic, mqttPublishMessage);
        return publish2Subscribers;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispatchConnection(MqttConnectMessage mqttConnectMessage) {
        this.interceptor.notifyClientConnected(mqttConnectMessage);
    }

    void dispatchDisconnection(String str, String str2) {
        this.interceptor.notifyClientDisconnected(str, str2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispatchConnectionLost(String str, String str2) {
        this.interceptor.notifyClientConnectionLost(str, str2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String sessionLoopThreadName(String str) {
        return this.sessionLoops.sessionLoopThreadName(str);
    }

    public RouteResult routeCommand(String str, String str2, Callable<String> callable) {
        return this.sessionLoops.routeCommand(str, str2, callable);
    }

    public void terminate() {
        this.sessionLoops.terminate();
    }

    public void clientDisconnected(String str, String str2) {
        dispatchDisconnection(str, str2);
        this.failedPublishes.cleanupForClient(str);
    }
}
