package io.moquette.spi.impl;

import io.moquette.connections.IConnectionsManager;
import io.moquette.server.ConnectionDescriptor;
import io.moquette.server.netty.NettyUtils;
import io.moquette.spi.IMessagesStore;
import io.moquette.spi.impl.subscriptions.ISubscriptionsDirectory;
import io.moquette.spi.impl.subscriptions.Topic;
import io.moquette.spi.security.IAuthorizator;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/moquette/spi/impl/Qos2PublishHandler.class */
public class Qos2PublishHandler extends QosPublishHandler {
    private static final Logger LOG = LoggerFactory.getLogger(Qos2PublishHandler.class);
    private final ISubscriptionsDirectory subscriptions;
    private final IMessagesStore m_messagesStore;
    private final BrokerInterceptor m_interceptor;
    private final IConnectionsManager connectionDescriptors;
    private final MessagesPublisher publisher;
    private final SessionsRepository sessionsRepository;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Qos2PublishHandler(IAuthorizator iAuthorizator, ISubscriptionsDirectory iSubscriptionsDirectory, IMessagesStore iMessagesStore, BrokerInterceptor brokerInterceptor, IConnectionsManager iConnectionsManager, MessagesPublisher messagesPublisher, SessionsRepository sessionsRepository) {
        super(iAuthorizator);
        this.subscriptions = iSubscriptionsDirectory;
        this.m_messagesStore = iMessagesStore;
        this.m_interceptor = brokerInterceptor;
        this.connectionDescriptors = iConnectionsManager;
        this.publisher = messagesPublisher;
        this.sessionsRepository = sessionsRepository;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void receivedPublishQos2(Channel channel, MqttPublishMessage mqttPublishMessage) {
        Topic topic = new Topic(mqttPublishMessage.variableHeader().topicName());
        String clientID = NettyUtils.clientID(channel);
        String userName = NettyUtils.userName(channel);
        if (!this.m_authorizator.canWrite(topic, userName, clientID)) {
            LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
            return;
        }
        int messageId = mqttPublishMessage.variableHeader().messageId();
        IMessagesStore.StoredMessage asStoredMessage = ProtocolProcessor.asStoredMessage(mqttPublishMessage);
        asStoredMessage.setClientID(clientID);
        LOG.info("Sending publish message to subscribers CId={}, topic={}, messageId={}", new Object[]{clientID, topic, Integer.valueOf(messageId)});
        if (LOG.isTraceEnabled()) {
            LOG.trace("payload={}, subs Tree={}", DebugUtils.payload2Str(asStoredMessage.getPayload()), this.subscriptions.dumpTree());
        }
        this.sessionsRepository.sessionForClient(clientID).markAsInboundInflight(messageId, asStoredMessage);
        sendPubRec(clientID, messageId);
        this.m_interceptor.notifyTopicPublished(mqttPublishMessage, clientID, userName);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processPubRel(Channel channel, MqttMessage mqttMessage) {
        String clientID = NettyUtils.clientID(channel);
        int messageId = Utils.messageId(mqttMessage);
        LOG.info("Processing PUBREL message. CId={}, messageId={}", clientID, Integer.valueOf(messageId));
        IMessagesStore.StoredMessage inboundInflight = this.sessionsRepository.sessionForClient(clientID).inboundInflight(messageId);
        if (inboundInflight == null) {
            LOG.warn("Can't find inbound inflight message for CId={}, messageId={}", clientID, Integer.valueOf(messageId));
            throw new IllegalArgumentException("Can't find inbound inflight message");
        }
        Topic topic = new Topic(inboundInflight.getTopic());
        this.publisher.publish2Subscribers(inboundInflight, topic, messageId);
        if (inboundInflight.isRetained()) {
            if (inboundInflight.getPayload().readableBytes() == 0) {
                this.m_messagesStore.cleanRetained(topic);
            } else {
                this.m_messagesStore.storeRetained(topic, inboundInflight);
            }
        }
        sendPubComp(clientID, messageId);
    }

    private void sendPubRec(String str, int i) {
        LOG.trace("Sending PUBREC message. CId={}, messageId={}", str, Integer.valueOf(i));
        MqttMessage mqttMessage = new MqttMessage(new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(i));
        Optional<ConnectionDescriptor> lookupDescriptor = this.connectionDescriptors.lookupDescriptor(str);
        if (lookupDescriptor.isPresent()) {
            try {
                lookupDescriptor.get().writeAndFlush(mqttMessage);
            } catch (Throwable th) {
                LOG.error("Unable to send {} message. CId=<{}>, messageId={}", new Object[]{MqttMessageType.PUBREC, str, Integer.valueOf(i), th});
            }
        }
    }

    private void sendPubComp(String str, int i) {
        LOG.trace("Sending PUBCOMP message. CId={}, messageId={}", str, Integer.valueOf(i));
        MqttMessage mqttMessage = new MqttMessage(new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(i));
        Optional<ConnectionDescriptor> lookupDescriptor = this.connectionDescriptors.lookupDescriptor(str);
        if (lookupDescriptor.isPresent()) {
            try {
                lookupDescriptor.get().writeAndFlush(mqttMessage);
            } catch (Throwable th) {
                LOG.error("Unable to send {} message. CId=<{}>, messageId={}", new Object[]{MqttMessageType.PUBCOMP, str, Integer.valueOf(i), th});
            }
        }
    }
}
