/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.plugin.listener;

import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobStatus;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.plugin.util.FlinkUtils;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StartupStreamListener
implements SortOperateListener {
    private static final Logger log = LoggerFactory.getLogger(StartupStreamListener.class);

    public TaskEvent event() {
        return TaskEvent.COMPLETE;
    }

    public boolean accept(WorkflowContext workflowContext) {
        ProcessForm processForm = workflowContext.getProcessForm();
        String groupId = processForm.getInlongGroupId();
        if (!(processForm instanceof StreamResourceProcessForm)) {
            log.info("not add startup stream listener, not StreamResourceProcessForm for groupId [{}]", (Object)groupId);
            return false;
        }
        StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm)processForm;
        String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
        if (streamProcessForm.getGroupOperateType() != GroupOperateType.INIT) {
            log.info("not add startup stream listener, as the operate was not INIT for groupId [{}] streamId [{}]", (Object)groupId, (Object)streamId);
            return false;
        }
        log.info("add startup stream listener for groupId [{}] streamId [{}]", (Object)groupId, (Object)streamId);
        return InlongConstants.STANDARD_MODE.equals(streamProcessForm.getGroupInfo().getInlongGroupMode());
    }

    public ListenerResult listen(WorkflowContext context) throws Exception {
        String dataflow;
        ProcessForm processForm = context.getProcessForm();
        StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm)processForm;
        InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo();
        List streamExtList = streamInfo.getExtList();
        log.info("inlong stream :{} ext info: {}", (Object)streamInfo.getInlongStreamId(), (Object)streamExtList);
        String groupId = streamInfo.getInlongGroupId();
        String streamId = streamInfo.getInlongStreamId();
        List sinkList = streamInfo.getSinkList();
        List sinkTypes = sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
        if (CollectionUtils.isEmpty((Collection)sinkList) || !SinkType.containSortFlinkSink(sinkTypes)) {
            log.warn("not any sink configured for group {} and stream {}, skip launching sort job", (Object)groupId, (Object)streamId);
            return ListenerResult.success();
        }
        List extList = streamInfo.getExtList();
        log.info("stream ext info: {}", (Object)extList);
        Map<String, String> kvConf = extList.stream().filter(v -> StringUtils.isNotEmpty((CharSequence)v.getKeyName()) && StringUtils.isNotEmpty((CharSequence)v.getKeyValue())).collect(Collectors.toMap(InlongStreamExtInfo::getKeyName, InlongStreamExtInfo::getKeyValue));
        String sortExt = kvConf.get("sort.properties");
        if (StringUtils.isNotEmpty((CharSequence)sortExt)) {
            Map result = (Map)JsonUtils.OBJECT_MAPPER.convertValue((Object)JsonUtils.OBJECT_MAPPER.readTree(sortExt), (TypeReference)new TypeReference<Map<String, String>>(){});
            kvConf.putAll(result);
        }
        if (StringUtils.isEmpty((CharSequence)(dataflow = kvConf.get("dataflow")))) {
            String message = String.format("dataflow is empty for groupId [%s], streamId [%s]", groupId, streamInfo.getInlongStreamId());
            log.error(message);
            return ListenerResult.fail((String)message);
        }
        FlinkInfo flinkInfo = new FlinkInfo();
        String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + streamInfo.getInlongStreamId();
        flinkInfo.setJobName(jobName);
        String sortUrl = kvConf.get("sort.url");
        flinkInfo.setEndpoint(sortUrl);
        flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
        FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
        FlinkOperation flinkOperation = new FlinkOperation(flinkService);
        try {
            flinkOperation.genPath(flinkInfo, dataflow);
            flinkOperation.start(flinkInfo);
            log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", new Object[]{groupId, streamInfo.getInlongStreamId(), flinkInfo.getJobId()});
        }
        catch (Exception e) {
            flinkInfo.setException(true);
            flinkInfo.setExceptionMsg(FlinkUtils.getExceptionStackMsg(e));
            flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
            String message = String.format("startup sort failed for groupId [%s], streamId [%s]", groupId, streamInfo.getInlongStreamId());
            log.error(message, (Throwable)e);
            return ListenerResult.fail((String)(message + e.getMessage()));
        }
        this.saveInfo(streamInfo, "sort.job.id", flinkInfo.getJobId(), extList);
        flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
        return ListenerResult.success();
    }

    private void saveInfo(InlongStreamInfo streamInfo, String keyName, String keyValue, List<InlongStreamExtInfo> extInfoList) {
        InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
        extInfo.setInlongGroupId(streamInfo.getInlongGroupId());
        extInfo.setInlongStreamId(streamInfo.getInlongStreamId());
        extInfo.setKeyName(keyName);
        extInfo.setKeyValue(keyValue);
        extInfoList.add(extInfo);
    }
}

