/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.listener.source;

import java.util.List;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public abstract class AbstractSourceOperateListener
implements SourceOperateListener {
    private static final Logger log = LoggerFactory.getLogger(AbstractSourceOperateListener.class);
    @Autowired
    protected InlongStreamService streamService;
    @Autowired
    protected StreamSourceService streamSourceService;

    public TaskEvent event() {
        return TaskEvent.COMPLETE;
    }

    public ListenerResult listen(WorkflowContext context) throws Exception {
        log.info("operate stream source for groupId={}", (Object)context.getProcessForm().getInlongGroupId());
        InlongGroupInfo groupInfo = this.getGroupInfo(context.getProcessForm());
        String groupId = groupInfo.getInlongGroupId();
        List<InlongStreamBriefInfo> streamResponses = this.streamService.listBriefWithSink(groupId);
        streamResponses.forEach(stream -> this.operateStreamSources(groupId, stream.getInlongStreamId(), context.getOperator()));
        return ListenerResult.success();
    }

    protected void operateStreamSources(String groupId, String streamId, String operator) {
        List<StreamSource> sources = this.streamSourceService.listSource(groupId, streamId);
        sources.forEach(source -> this.operateStreamSource(source.genSourceRequest(), operator));
    }

    public abstract void operateStreamSource(SourceRequest var1, String var2);

    private GroupOperateType getOperateType(ProcessForm processForm) {
        if (processForm instanceof GroupResourceProcessForm) {
            return ((GroupResourceProcessForm)processForm).getGroupOperateType();
        }
        log.error("illegal process form {} to get inlong group info", (Object)processForm.getFormName());
        throw new RuntimeException("Unsupported process form " + processForm.getFormName());
    }

    private InlongGroupInfo getGroupInfo(ProcessForm processForm) {
        if (processForm instanceof GroupResourceProcessForm) {
            GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm)processForm;
            return groupResourceProcessForm.getGroupInfo();
        }
        log.error("illegal process form {} to get inlong group info", (Object)processForm.getFormName());
        throw new RuntimeException("Unsupported process form " + processForm.getFormName());
    }
}

