/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.protocol.mqtt;

import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.messages.InterceptConnectMessage;
import io.moquette.interception.messages.InterceptDisconnectMessage;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.time.ZoneId;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.mqtt.Message;
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatManager;
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
import org.apache.iotdb.db.protocol.mqtt.TableMessage;
import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.MqttClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.BitMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MPPPublishHandler
extends AbstractInterceptHandler {
    private static final Logger LOG = LoggerFactory.getLogger(MPPPublishHandler.class);
    private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
    private final SessionManager sessionManager = SessionManager.getInstance();
    private final ConcurrentHashMap<String, MqttClientSession> clientIdToSessionMap = new ConcurrentHashMap();
    private final PayloadFormatter payloadFormat;
    private final IPartitionFetcher partitionFetcher;
    private final ISchemaFetcher schemaFetcher;
    private final boolean useTableInsert;

    public MPPPublishHandler(IoTDBConfig config) {
        this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
        this.partitionFetcher = ClusterPartitionFetcher.getInstance();
        this.schemaFetcher = ClusterSchemaFetcher.getInstance();
        this.useTableInsert = "table".equals(this.payloadFormat.getType());
    }

    public String getID() {
        return "iotdb-mqtt-broker-listener";
    }

    public void onConnect(InterceptConnectMessage msg) {
        if (!this.clientIdToSessionMap.containsKey(msg.getClientID())) {
            MqttClientSession session = new MqttClientSession(msg.getClientID());
            this.sessionManager.login(session, msg.getUsername(), new String(msg.getPassword()), ZoneId.systemDefault().toString(), TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3, IoTDBConstant.ClientVersion.V_1_0, this.useTableInsert ? IClientSession.SqlDialect.TABLE : IClientSession.SqlDialect.TREE);
            this.sessionManager.registerSession(session);
            this.clientIdToSessionMap.put(msg.getClientID(), session);
        }
    }

    public void onDisconnect(InterceptDisconnectMessage msg) {
        MqttClientSession session = this.clientIdToSessionMap.remove(msg.getClientID());
        if (null != session) {
            this.sessionManager.removeCurrSession();
            this.sessionManager.closeSession(session, Coordinator.getInstance()::cleanupQueryExecution);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onPublish(InterceptPublishMessage msg) {
        try {
            String clientId = msg.getClientID();
            if (!this.clientIdToSessionMap.containsKey(clientId)) {
                return;
            }
            MqttClientSession session = this.clientIdToSessionMap.get(msg.getClientID());
            ByteBuf payload = msg.getPayload();
            String topic = msg.getTopicName();
            String username = msg.getUsername();
            MqttQoS qos = msg.getQos();
            LOG.debug("Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}", new Object[]{clientId, username, qos, topic, payload});
            List<Message> messages = this.payloadFormat.format(payload);
            if (messages == null) {
                return;
            }
            for (Message message : messages) {
                if (message == null) continue;
                if (this.useTableInsert) {
                    TableMessage tableMessage = (TableMessage)message;
                    String database = !msg.getTopicName().contains("/") ? msg.getTopicName() : msg.getTopicName().substring(0, msg.getTopicName().indexOf("/"));
                    tableMessage.setDatabase(database);
                    this.insertTable(tableMessage, session);
                    continue;
                }
                this.insertTree((TreeMessage)message, session);
            }
        }
        catch (Throwable t) {
            LOG.warn("onPublish execution exception, msg is [{}], error is ", (Object)msg, (Object)t);
        }
        finally {
            super.onPublish(msg);
        }
    }

    private void insertTable(TableMessage message, MqttClientSession session) {
        TSStatus tsStatus = null;
        try {
            TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp());
            InsertTabletStatement insertTabletStatement = this.constructInsertTabletStatement(message);
            session.setDatabaseName(message.getDatabase().toLowerCase());
            session.setSqlDialect(IClientSession.SqlDialect.TABLE);
            long queryId = this.sessionManager.requestQueryId();
            SqlParser relationSqlParser = new SqlParser();
            Metadata metadata = LocalExecutionPlanner.getInstance().metadata;
            ExecutionResult result = Coordinator.getInstance().executeForTableModel(insertTabletStatement, relationSqlParser, session, queryId, this.sessionManager.getSessionInfo(session), "", metadata, config.getQueryTimeoutThreshold());
            tsStatus = result.status;
            LOG.debug("process result: {}", (Object)tsStatus);
            if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                LOG.warn("mqtt line insert error , message = {}", (Object)tsStatus.message);
            }
        }
        catch (Exception e) {
            LOG.warn("meet error when inserting database {}, table {}, tags {}, attributes {}, fields {}, at time {}, because ", new Object[]{message.getDatabase(), message.getTable(), message.getTagKeys(), message.getAttributeKeys(), message.getFields(), message.getTimestamp(), e});
        }
    }

    private InsertTabletStatement constructInsertTabletStatement(TableMessage message) throws IllegalPathException {
        int i;
        InsertTabletStatement insertStatement = new InsertTabletStatement();
        insertStatement.setDevicePath(DataNodeDevicePathCache.getInstance().getPartialPath(message.getTable()));
        List<String> measurements = Stream.of(message.getFields(), message.getTagKeys(), message.getAttributeKeys()).flatMap(Collection::stream).collect(Collectors.toList());
        insertStatement.setMeasurements(measurements.toArray(new String[0]));
        long[] timestamps = new long[]{message.getTimestamp()};
        insertStatement.setTimes(timestamps);
        int columnSize = measurements.size();
        int rowSize = 1;
        BitMap[] bitMaps = new BitMap[columnSize];
        Object[] columns = Stream.of(message.getValues(), message.getTagValues(), message.getAttributeValues()).flatMap(Collection::stream).toArray(Object[]::new);
        insertStatement.setColumns(columns);
        insertStatement.setBitMaps(bitMaps);
        insertStatement.setRowCount(rowSize);
        insertStatement.setAligned(false);
        insertStatement.setWriteToTable(true);
        TSDataType[] dataTypes = new TSDataType[measurements.size()];
        TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[measurements.size()];
        for (i = 0; i < message.getFields().size(); ++i) {
            dataTypes[i] = message.getDataTypes().get(i);
            columnCategories[i] = TsTableColumnCategory.FIELD;
        }
        for (i = message.getFields().size(); i < message.getFields().size() + message.getTagKeys().size(); ++i) {
            dataTypes[i] = TSDataType.STRING;
            columnCategories[i] = TsTableColumnCategory.TAG;
        }
        for (i = message.getFields().size() + message.getTagKeys().size(); i < message.getFields().size() + message.getTagKeys().size() + message.getAttributeKeys().size(); ++i) {
            dataTypes[i] = TSDataType.STRING;
            columnCategories[i] = TsTableColumnCategory.ATTRIBUTE;
        }
        insertStatement.setDataTypes(dataTypes);
        insertStatement.setColumnCategories(columnCategories);
        return insertStatement;
    }

    private void insertTree(TreeMessage message, MqttClientSession session) {
        TSStatus tsStatus = null;
        try {
            InsertRowStatement statement = new InsertRowStatement();
            statement.setDevicePath(DataNodeDevicePathCache.getInstance().getPartialPath(message.getDevice()));
            TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp());
            statement.setTime(message.getTimestamp());
            statement.setMeasurements(message.getMeasurements().toArray(new String[0]));
            if (message.getDataTypes() == null) {
                statement.setDataTypes(new TSDataType[message.getMeasurements().size()]);
                statement.setValues(message.getValues().toArray(new Object[0]));
                statement.setNeedInferType(true);
            } else {
                List<TSDataType> dataTypes = message.getDataTypes();
                List<String> values = message.getValues();
                Object[] inferredValues = new Object[values.size()];
                for (int i = 0; i < values.size(); ++i) {
                    inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i), values.get(i));
                }
                statement.setDataTypes(dataTypes.toArray(new TSDataType[0]));
                statement.setValues(inferredValues);
            }
            statement.setAligned(false);
            tsStatus = AuthorityChecker.checkAuthority((Statement)statement, session);
            if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                LOG.warn(tsStatus.message);
            } else {
                long queryId = this.sessionManager.requestQueryId();
                ExecutionResult result = Coordinator.getInstance().executeForTreeModel(statement, queryId, this.sessionManager.getSessionInfo(session), "", this.partitionFetcher, this.schemaFetcher, config.getQueryTimeoutThreshold(), false);
                tsStatus = result.status;
                LOG.debug("process result: {}", (Object)tsStatus);
                if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                    LOG.warn("mqtt json insert error , message = {}", (Object)tsStatus.message);
                }
            }
        }
        catch (Exception e) {
            LOG.warn("meet error when inserting device {}, measurements {}, at time {}, because ", new Object[]{message.getDevice(), message.getMeasurements(), message.getTimestamp(), e});
        }
    }

    public void onSessionLoopError(Throwable throwable) {
    }
}

