/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.mqtt;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.EmptyByteBuf;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTLogger;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTVersion;
import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.DisconnectException;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MQTTPublishManager {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private SimpleString qos2ManagementAddress;
    private Queue qos2ManagementQueue;
    private final String senderName = UUIDGenerator.getInstance().generateUUID().toString();
    private boolean createProducer = true;
    private ServerConsumer qos2ManagementConsumer;
    private final MQTTSession session;
    private final Object lock = new Object();
    private MQTTSessionState state;
    private MQTTSessionState.OutboundStore outboundStore;
    private boolean closeMqttConnectionOnPublishAuthorizationFailure;

    public MQTTPublishManager(MQTTSession session, boolean closeMqttConnectionOnPublishAuthorizationFailure) {
        this.session = session;
        this.closeMqttConnectionOnPublishAuthorizationFailure = closeMqttConnectionOnPublishAuthorizationFailure;
    }

    synchronized void start() {
        this.state = this.session.getState();
        this.outboundStore = this.state.getOutboundStore();
    }

    synchronized void stop() throws Exception {
        ServerSessionImpl serversession = this.session.getServerSession();
        if (serversession != null) {
            serversession.removeProducer(serversession.getName());
        }
        if (this.qos2ManagementConsumer != null) {
            this.qos2ManagementConsumer.removeItself();
            this.qos2ManagementConsumer.setStarted(false);
            this.qos2ManagementConsumer.close(false);
        }
    }

    void clean() throws Exception {
        if (this.qos2ManagementQueue != null) {
            this.qos2ManagementQueue.deleteQueue();
        }
    }

    boolean isQos2ManagementConsumer(ServerConsumer consumer) {
        return consumer == this.qos2ManagementConsumer;
    }

    protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
        if (this.isQos2ManagementConsumer(consumer)) {
            this.sendPubRelMessage((Message)message);
        } else {
            int qos = this.decideQoS((Message)message, consumer);
            if (qos == 0) {
                if (this.publishToClient((int)message.getMessageID(), message, deliveryCount, qos, consumer.getID())) {
                    this.session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID());
                }
            } else if (qos == 1 || qos == 2) {
                int mqttid = this.outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
                this.outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
                this.publishToClient(mqttid, message, deliveryCount, qos, consumer.getID());
            } else {
                consumer.individualCancel(message.getMessageID(), false);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            Integer alias;
            if (this.createProducer) {
                this.session.getServerSession().addProducer(this.senderName, "MQTT", "ANONYMOUS");
                this.createProducer = false;
            }
            String topic = message.variableHeader().topicName();
            if (this.session.getVersion() == MQTTVersion.MQTT_5 && (alias = MQTTUtil.getProperty(Integer.class, message.variableHeader().properties(), MqttProperties.MqttPropertyType.TOPIC_ALIAS)) != null) {
                Integer topicAliasMax = this.session.getProtocolManager().getTopicAliasMaximum();
                if (alias == 0) {
                    throw new DisconnectException(-108);
                }
                if (topicAliasMax != null && alias > topicAliasMax) {
                    throw new DisconnectException(-108);
                }
                String existingTopicMapping = this.session.getState().getClientTopicAlias(alias);
                if (existingTopicMapping == null) {
                    if (topic == null || topic.isEmpty()) {
                        throw new DisconnectException(-108);
                    }
                    logger.debug("Adding new alias {} for topic {}", (Object)alias, (Object)topic);
                    this.session.getState().putClientTopicAlias(alias, topic);
                } else if (topic != null && !topic.isEmpty()) {
                    logger.debug("Modifying existing alias {}. New value: {}; old value: {}", new Object[]{alias, topic, existingTopicMapping});
                    this.session.getState().putClientTopicAlias(alias, topic);
                } else {
                    logger.debug("Applying topic {} for alias {}", (Object)existingTopicMapping, (Object)alias);
                    topic = existingTopicMapping;
                }
            }
            String coreAddress = MQTTUtil.getCoreAddressFromMqttTopic(topic, this.session.getWildcardConfiguration());
            SimpleString address = SimpleString.of((String)coreAddress, (SimpleString.StringSimpleStringPool)this.session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
            Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(this.session, address, message);
            int qos = message.fixedHeader().qosLevel().value();
            if (qos > 0) {
                serverMessage.setDurable(true);
            }
            int packetId = message.variableHeader().packetId();
            boolean qos2PublishAlreadyReceived = this.state.getPubRec().contains(packetId);
            if (qos < 2 || !qos2PublishAlreadyReceived) {
                if (qos == 2 && !internal) {
                    this.state.getPubRec().add(packetId);
                }
                Transaction tx = this.session.getServerSession().newTransaction();
                try {
                    AddressInfo addressInfo = this.session.getServer().getAddressInfo(address);
                    if (addressInfo == null && ((AddressSettings)this.session.getServer().getAddressSettingsRepository().getMatch(coreAddress)).isAutoCreateAddresses().booleanValue()) {
                        this.session.getServerSession().createAddress(address, RoutingType.MULTICAST, true);
                        serverMessage.setRoutingType(RoutingType.MULTICAST);
                    }
                    if (addressInfo != null) {
                        serverMessage.setRoutingType(addressInfo.getRoutingType());
                    }
                    this.session.getServerSession().send(tx, serverMessage, true, this.senderName, false);
                    if (message.fixedHeader().isRetain()) {
                        ByteBuf payload = message.payload();
                        boolean reset = payload instanceof EmptyByteBuf || payload.capacity() == 0;
                        this.session.getRetainMessageManager().handleRetainedMessage(serverMessage, topic, reset, tx);
                    }
                    tx.commit();
                }
                catch (ActiveMQSecurityException e) {
                    tx.rollback();
                    if (internal) {
                        throw e;
                    }
                    if (this.session.getVersion() == MQTTVersion.MQTT_5) {
                        this.sendMessageAck(internal, qos, packetId, (byte)-121);
                        return;
                    }
                    if (this.session.getVersion() == MQTTVersion.MQTT_3_1_1) {
                        if (this.closeMqttConnectionOnPublishAuthorizationFailure) {
                            throw new DisconnectException();
                        }
                        logger.debug("MQTT 3.1.1 client not authorized to publish message.");
                    }
                    logger.debug("MQTT 3.1 client not authorized to publish message.");
                }
                catch (Throwable t) {
                    MQTTLogger.LOGGER.failedToPublishMqttMessage(t.getMessage(), t);
                    tx.rollback();
                    throw t;
                }
            } else if (qos2PublishAlreadyReceived) {
                MQTTLogger.LOGGER.ignoringQoS2Publish(this.state.getClientId(), packetId);
            }
            this.createMessageAck(packetId, qos, internal);
        }
    }

    private void sendMessageAck(boolean internal, int qos, int messageId, byte reasonCode) {
        if (!internal) {
            if (qos == 1) {
                this.session.getProtocolHandler().sendPubAck(messageId, reasonCode);
            } else if (qos == 2) {
                this.session.getProtocolHandler().sendPubRec(messageId, reasonCode);
            }
        }
    }

    void sendPubRelMessage(Message message) {
        int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY);
        this.session.getState().getOutboundStore().publishReleasedSent(messageId, message.getMessageID());
        this.session.getProtocolHandler().sendPubRel(messageId);
    }

    void handlePubRec(int messageId) throws Exception {
        try {
            Pair<Long, Long> ref = this.outboundStore.publishReceived(messageId);
            if (ref != null) {
                this.initQos2Resources();
                MQTTUtil.sendMessageDirectlyToQueue(this.session.getServer().getStorageManager(), this.session.getServer().getPostOffice(), this.createPubRelMessage(this.session, messageId), this.qos2ManagementQueue, null);
                this.session.getServerSession().individualAcknowledge(((Long)ref.getB()).longValue(), ((Long)ref.getA()).longValue());
                this.releaseFlowControl((Long)ref.getB());
            } else {
                this.session.getProtocolHandler().sendPubRel(messageId);
            }
        }
        catch (ActiveMQIllegalStateException e) {
            MQTTLogger.LOGGER.failedToAckMessage(this.session.getState().getClientId(), (Exception)((Object)e));
        }
    }

    private void initQos2Resources() throws Exception {
        if (this.qos2ManagementAddress == null) {
            this.qos2ManagementAddress = SimpleString.of((String)("$sys.mqtt.queue.qos2." + this.session.getState().getClientId()));
        }
        if (this.qos2ManagementQueue == null) {
            this.qos2ManagementQueue = this.session.getServer().createQueue(QueueConfiguration.of((SimpleString)this.qos2ManagementAddress).setRoutingType(RoutingType.ANYCAST).setDurable(Boolean.valueOf(true)), true);
            this.qos2ManagementConsumer = this.session.getServerSession().createInternalConsumer(this.qos2ManagementAddress);
            this.qos2ManagementConsumer.setStarted(true);
        }
    }

    private Message createPubRelMessage(MQTTSession session, int messageId) {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
        MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, null, null);
        Message message = MQTTUtil.createServerMessage(session, this.qos2ManagementAddress, publishMessage).putIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY, messageId).putIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY, MqttMessageType.PUBREL.value());
        return message;
    }

    private void releaseFlowControl(Long consumerId) {
        ServerConsumer consumer = this.session.getServerSession().locateConsumer(consumerId.longValue());
        if (consumer != null) {
            consumer.promptDelivery();
        }
    }

    void handlePubComp(int messageId) throws Exception {
        Pair<Long, Long> ref = this.session.getState().getOutboundStore().publishComplete(messageId);
        if (ref != null) {
            this.session.getServerSession().individualAcknowledge(this.qos2ManagementConsumer.getID(), ((Long)ref.getA()).longValue());
        }
    }

    private void createMessageAck(final int messageId, final int qos, final boolean internal) {
        this.session.getServer().getStorageManager().afterCompleteOperations(new IOCallback(){
            final /* synthetic */ MQTTPublishManager this$0;
            {
                this.this$0 = this$0;
            }

            public void done() {
                this.this$0.sendMessageAck(internal, qos, messageId, (byte)0);
            }

            public void onError(int errorCode, String errorMessage) {
                logger.error("Pub Sync Failed");
            }
        });
    }

    void handlePubRel(int messageId) {
        this.state.getPubRec().remove(messageId);
        this.session.getProtocolHandler().sendPubComp(messageId);
        this.state.removeMessageRef(messageId);
    }

    void handlePubAck(int messageId) throws Exception {
        try {
            Pair<Long, Long> ref = this.outboundStore.publishAckd(messageId);
            if (ref != null) {
                this.session.getServerSession().individualAcknowledge(((Long)ref.getB()).longValue(), ((Long)ref.getA()).longValue());
                this.releaseFlowControl((Long)ref.getB());
            }
        }
        catch (ActiveMQIllegalStateException e) {
            logger.warn("MQTT Client({}) attempted to Ack already Ack'd message", (Object)this.session.getState().getClientId());
        }
    }

    private boolean publishToClient(int messageId, ICoreMessage message, int deliveryCount, int qos, long consumerId) throws Exception {
        int size;
        ByteBuf payload;
        String topic = MQTTUtil.getMqttTopicFromCoreAddress(Objects.requireNonNullElse(message.getAddress(), ""), this.session.getWildcardConfiguration());
        switch (message.getType()) {
            case 3: {
                SimpleString text = message.getDataBuffer().readNullableSimpleString();
                int utf8Bytes = ByteBufUtil.utf8Bytes((CharSequence)text);
                payload = ByteBufAllocator.DEFAULT.directBuffer(utf8Bytes);
                ByteBufUtil.reserveAndWriteUtf8((ByteBuf)payload, (CharSequence)text, (int)utf8Bytes);
                break;
            }
            default: {
                ActiveMQBuffer bodyBuffer = message.getDataBuffer();
                payload = ByteBufAllocator.DEFAULT.directBuffer(bodyBuffer.writerIndex());
                payload.writeBytes(bodyBuffer.byteBuf());
            }
        }
        boolean redelivery = qos == 0 ? false : deliveryCount > 1;
        boolean isRetain = message.containsProperty(MQTTUtil.MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY);
        MqttProperties mqttProperties = null;
        if (this.session.getVersion() == MQTTVersion.MQTT_5) {
            MqttTopicSubscription sub;
            mqttProperties = this.getPublishProperties(message);
            if (!isRetain && message.getBooleanProperty(MQTTUtil.MQTT_MESSAGE_RETAIN_KEY).booleanValue() && (sub = this.session.getState().getSubscription(topic)) != null && sub.option().isRetainAsPublished()) {
                isRetain = true;
            }
            if (this.session.getState().getClientTopicAliasMaximum() != null) {
                Integer alias = this.session.getState().getServerTopicAlias(topic);
                if (alias == null) {
                    alias = this.session.getState().addServerTopicAlias(topic);
                    if (alias != null) {
                        mqttProperties.add((MqttProperties.MqttProperty)new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.TOPIC_ALIAS.value(), alias));
                    }
                } else {
                    mqttProperties.add((MqttProperties.MqttProperty)new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.TOPIC_ALIAS.value(), alias));
                    topic = "";
                }
            }
        }
        int remainingLength = MQTTUtil.calculateRemainingLength(topic, mqttProperties, payload);
        MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf((int)qos), isRetain, remainingLength);
        MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId, mqttProperties);
        MqttPublishMessage publish = new MqttPublishMessage(header, varHeader, payload);
        int maxSize = this.session.getState().getClientMaxPacketSize();
        if (this.session.getVersion() == MQTTVersion.MQTT_5 && maxSize != 0 && (size = MQTTUtil.calculateMessageSize(publish)) > maxSize) {
            logger.debug("Not sending message {} to client as its size ({}) exceeds the max ({})", new Object[]{message, size, maxSize});
            this.session.getServerSession().individualAcknowledge(consumerId, message.getMessageID());
            return false;
        }
        this.session.getProtocolHandler().sendToClient((MqttMessage)publish);
        return true;
    }

    private MqttProperties getPublishProperties(ICoreMessage message) {
        List<Integer> subscriptionIdentifiers;
        MqttProperties props = new MqttProperties();
        if (message.containsProperty(MQTTUtil.MQTT_PAYLOAD_FORMAT_INDICATOR_KEY)) {
            props.add((MqttProperties.MqttProperty)new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.PAYLOAD_FORMAT_INDICATOR.value(), message.getIntProperty(MQTTUtil.MQTT_PAYLOAD_FORMAT_INDICATOR_KEY)));
        }
        if (message.containsProperty(MQTTUtil.MQTT_RESPONSE_TOPIC_KEY)) {
            props.add((MqttProperties.MqttProperty)new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.RESPONSE_TOPIC.value(), message.getStringProperty(MQTTUtil.MQTT_RESPONSE_TOPIC_KEY)));
        }
        if (message.containsProperty(MQTTUtil.MQTT_CORRELATION_DATA_KEY)) {
            props.add((MqttProperties.MqttProperty)new MqttProperties.BinaryProperty(MqttProperties.MqttPropertyType.CORRELATION_DATA.value(), message.getBytesProperty(MQTTUtil.MQTT_CORRELATION_DATA_KEY)));
        }
        if (message.containsProperty("mqtt.user.property.exists")) {
            MqttProperties.StringPair[] orderedProperties = new MqttProperties.StringPair[message.getIntProperty("mqtt.user.property.exists").intValue()];
            for (SimpleString propertyName : message.getPropertyNames()) {
                if (!propertyName.startsWith(MQTTUtil.MQTT_USER_PROPERTY_KEY_PREFIX_SIMPLE)) continue;
                SimpleString[] split = propertyName.split('.');
                int position = Integer.parseInt(split[4].toString());
                String key = propertyName.subSeq(MQTTUtil.MQTT_USER_PROPERTY_KEY_PREFIX_SIMPLE.length() + split[4].length() + 1, propertyName.length()).toString();
                orderedProperties[position] = new MqttProperties.StringPair(key, message.getStringProperty(propertyName));
            }
            props.add((MqttProperties.MqttProperty)new MqttProperties.UserProperties(Arrays.asList(orderedProperties)));
        }
        if (message.containsProperty(MQTTUtil.MQTT_CONTENT_TYPE_KEY)) {
            props.add((MqttProperties.MqttProperty)new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.CONTENT_TYPE.value(), message.getStringProperty(MQTTUtil.MQTT_CONTENT_TYPE_KEY)));
        }
        if ((subscriptionIdentifiers = this.session.getState().getMatchingSubscriptionIdentifiers(message.getAddress())) != null) {
            for (Integer id : subscriptionIdentifiers) {
                props.add((MqttProperties.MqttProperty)new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value(), id));
            }
        }
        if (message.getExpiration() != 0L) {
            int messageExpiryInterval = (int)Math.round((double)(message.getExpiration() - System.currentTimeMillis()) / 1000000.0 * 1000.0);
            props.add((MqttProperties.MqttProperty)new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value(), Integer.valueOf(messageExpiryInterval)));
        }
        return props;
    }

    private int decideQoS(Message message, ServerConsumer consumer) {
        int subscriptionQoS = -1;
        try {
            subscriptionQoS = this.session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
        }
        catch (NullPointerException e) {
            return subscriptionQoS;
        }
        int qos = 2;
        if (message.containsProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY)) {
            qos = message.getIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY);
        }
        return subscriptionQoS < qos ? subscriptionQoS : qos;
    }
}

