/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.consume;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarDTO;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarTopicInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.consume.AbstractConsumeOperator;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ConsumePulsarOperator
extends AbstractConsumeOperator {
    private static final Integer DLQ_RLQ_ENABLE = 1;
    private static final Integer DLQ__RLQ_DISABLE = 0;
    private static final String PREFIX_DLQ = "dlq";
    private static final String PREFIX_RLQ = "rlq";
    @Autowired
    private InlongGroupService groupService;
    @Autowired
    private InlongStreamEntityMapper streamMapper;
    @Autowired
    private InlongClusterService clusterService;
    @Autowired
    private InlongStreamService streamService;
    @Autowired
    private ObjectMapper objectMapper;

    @Override
    public Boolean accept(String mqType) {
        return this.getMQType().equals(mqType) || "TDMQ_PULSAR".equals(mqType);
    }

    @Override
    public String getMQType() {
        return "PULSAR";
    }

    @Override
    public void checkTopicInfo(InlongConsumeRequest request) {
        String groupId = request.getInlongGroupId();
        InlongGroupTopicInfo topicInfo = this.groupService.getTopic(groupId);
        Preconditions.expectNotNull((Object)topicInfo, (String)("inlong group not exist for groupId=" + groupId));
        InlongPulsarTopicInfo pulsarTopic = (InlongPulsarTopicInfo)topicInfo;
        String originTopic = request.getTopic();
        if (originTopic.startsWith("persistent")) {
            originTopic = originTopic.substring(originTopic.lastIndexOf("/") + 1);
            request.setTopic(originTopic);
        }
        Preconditions.expectTrue((boolean)pulsarTopic.getTopics().contains(originTopic), (String)("Pulsar topic not exist for " + originTopic));
    }

    @Override
    public InlongConsumeInfo getFromEntity(InlongConsumeEntity entity) {
        Preconditions.expectNotNull((Object)entity, (String)ErrorCodeEnum.CONSUME_NOT_FOUND.getMessage());
        ConsumePulsarInfo consumeInfo = new ConsumePulsarInfo();
        CommonBeanUtils.copyProperties((Object)entity, (Object)consumeInfo);
        if (StringUtils.isNotBlank((CharSequence)entity.getExtParams())) {
            ConsumePulsarDTO dto = ConsumePulsarDTO.getFromJson((String)entity.getExtParams());
            CommonBeanUtils.copyProperties((Object)dto, (Object)consumeInfo);
        }
        String groupId = entity.getInlongGroupId();
        InlongGroupInfo groupInfo = this.groupService.get(groupId);
        String clusterTag = groupInfo.getInlongClusterTag();
        List<ClusterInfo> clusterInfos = this.clusterService.listByTagAndType(clusterTag, "PULSAR");
        Preconditions.expectNotEmpty(clusterInfos, (String)("pulsar cluster not exist for groupId=" + groupId));
        consumeInfo.setClusterInfos(clusterInfos);
        String tenant = ((InlongPulsarInfo)groupInfo).getPulsarTenant();
        if (StringUtils.isBlank((CharSequence)tenant)) {
            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo)clusterInfos.get(0);
            tenant = pulsarCluster.getPulsarTenant();
        }
        consumeInfo.setTopic(this.getFullPulsarTopic(groupInfo, tenant, entity.getTopic()));
        return consumeInfo;
    }

    @Override
    protected void setTargetEntity(InlongConsumeRequest request, InlongConsumeEntity targetEntity) {
        ConsumePulsarRequest pulsarRequest = (ConsumePulsarRequest)request;
        boolean dlqEnable = DLQ_RLQ_ENABLE.equals(pulsarRequest.getIsDlq());
        boolean rlqEnable = DLQ_RLQ_ENABLE.equals(pulsarRequest.getIsRlq());
        if (rlqEnable && !dlqEnable) {
            throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_RLQ_ERROR);
        }
        String groupId = targetEntity.getInlongGroupId();
        if (dlqEnable) {
            String dlqTopic = "dlq_" + pulsarRequest.getDeadLetterTopic();
            Preconditions.expectTrue((this.streamService.exist(groupId, dlqTopic) == false ? 1 : 0) != 0, (String)ErrorCodeEnum.PULSAR_DLQ_DUPLICATED.getMessage());
        } else {
            pulsarRequest.setIsDlq(DLQ__RLQ_DISABLE);
            pulsarRequest.setDeadLetterTopic(null);
        }
        if (rlqEnable) {
            String rlqTopic = "rlq_" + pulsarRequest.getRetryLetterTopic();
            Preconditions.expectTrue((this.streamService.exist(groupId, rlqTopic) == false ? 1 : 0) != 0, (String)ErrorCodeEnum.PULSAR_RLQ_DUPLICATED.getMessage());
        } else {
            pulsarRequest.setIsRlq(DLQ__RLQ_DISABLE);
            pulsarRequest.setRetryLetterTopic(null);
        }
        try {
            ConsumePulsarDTO dto = ConsumePulsarDTO.getFromRequest((ConsumePulsarRequest)pulsarRequest, (String)targetEntity.getExtParams());
            targetEntity.setExtParams(this.objectMapper.writeValueAsString((Object)dto));
        }
        catch (Exception e) {
            throw new BusinessException(ErrorCodeEnum.CONSUME_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
        }
    }

    private String getFullPulsarTopic(InlongGroupInfo groupInfo, String tenant, String topic) {
        if (StringUtils.isEmpty((CharSequence)tenant)) {
            tenant = "public";
        }
        String namespace = groupInfo.getMqResource();
        return String.format("persistent://%s/%s/%s", tenant, namespace, topic);
    }
}

