/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server.task.flow;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.flow.ActionFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransformFlowLifeCycle<T>
extends ActionFlowLifeCycle
implements OneInputFlowLifeCycle<Record<?>> {
    private static final Logger log = LoggerFactory.getLogger(TransformFlowLifeCycle.class);
    private final TransformChainAction<T> action;
    private final List<SeaTunnelTransform<T>> transform;
    private final Collector<Record<?>> collector;

    public TransformFlowLifeCycle(TransformChainAction<T> action, SeaTunnelTask runningTask, Collector<Record<?>> collector, CompletableFuture<Void> completableFuture) {
        super((Action)action, runningTask, completableFuture);
        this.action = action;
        this.transform = action.getTransforms();
        this.collector = collector;
    }

    @Override
    public void open() throws Exception {
        super.open();
        for (SeaTunnelTransform<T> t : this.transform) {
            try {
                t.open();
            }
            catch (Exception e) {
                log.error("Open transform: {} failed, cause: {}", new Object[]{t.getPluginName(), e.getMessage(), e});
            }
        }
    }

    @Override
    public void received(Record<?> record) {
        if (record.getData() instanceof Barrier) {
            CheckpointBarrier barrier = (CheckpointBarrier)record.getData();
            if (barrier.prepareClose(this.runningTask.getTaskLocation())) {
                this.prepareClose = true;
            }
            if (barrier.snapshot()) {
                this.runningTask.addState(barrier, ActionStateKey.of(this.action), Collections.emptyList());
            }
            this.runningTask.ack(barrier);
            this.collector.collect(record);
        } else if (record.getData() instanceof SchemaChangeEvent) {
            if (this.prepareClose.booleanValue()) {
                return;
            }
            SchemaChangeEvent event = (SchemaChangeEvent)record.getData();
            for (SeaTunnelTransform<T> t : this.transform) {
                SchemaChangeEvent eventBefore;
                event = t.mapSchemaChangeEvent(eventBefore = event);
                if (event == null) {
                    log.info("Transform[{}] filtered schema change event {}", (Object)t.getPluginName(), (Object)eventBefore);
                    break;
                }
                log.info("Transform[{}] input schema change event {} and output schema change event {}", new Object[]{t.getPluginName(), eventBefore, event});
            }
            if (event != null) {
                this.collector.collect((Object)new Record((Object)event));
            }
        } else {
            if (this.prepareClose.booleanValue()) {
                return;
            }
            Object inputData = record.getData();
            List<Object> outputDataList = this.transform(inputData);
            if (!outputDataList.isEmpty()) {
                for (Object outputData : outputDataList) {
                    this.collector.collect((Object)new Record(outputData));
                }
            }
        }
    }

    public List<T> transform(T inputData) {
        if (this.transform.isEmpty()) {
            return Collections.singletonList(inputData);
        }
        ArrayList<T> dataList = new ArrayList<T>();
        dataList.add(inputData);
        for (SeaTunnelTransform<T> transformer : this.transform) {
            ArrayList<Object> nextInputDataList = new ArrayList<Object>();
            if (transformer instanceof SeaTunnelFlatMapTransform) {
                SeaTunnelFlatMapTransform transformDecorator = (SeaTunnelFlatMapTransform)transformer;
                for (Object data : dataList) {
                    List outputDataArray = transformDecorator.flatMap(data);
                    log.debug("Transform[{}] input row {} and output row {}", new Object[]{transformer, data, outputDataArray});
                    if (!CollectionUtils.isNotEmpty((Collection)outputDataArray)) continue;
                    nextInputDataList.addAll(outputDataArray);
                }
            } else if (transformer instanceof SeaTunnelMapTransform) {
                for (Object data : dataList) {
                    SeaTunnelMapTransform transformDecorator = (SeaTunnelMapTransform)transformer;
                    Object outputData = transformDecorator.map(data);
                    log.debug("Transform[{}] input row {} and output row {}", new Object[]{transformer, data, outputData});
                    if (outputData == null) {
                        log.trace("Transform[{}] filtered data row {}", transformer, data);
                        continue;
                    }
                    nextInputDataList.add(outputData);
                }
            }
            dataList = nextInputDataList;
        }
        return dataList;
    }

    @Override
    public void restoreState(List<ActionSubtaskState> actionStateList) throws Exception {
    }

    @Override
    public void close() throws IOException {
        for (SeaTunnelTransform<T> t : this.transform) {
            try {
                t.close();
            }
            catch (Exception e) {
                log.error("Close transform: {} failed, cause: {}", new Object[]{t.getPluginName(), e.getMessage(), e});
            }
        }
        super.close();
    }
}

