/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.transform;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.transform.PreTransformChangeInfo;
import org.apache.flink.cdc.runtime.operators.transform.PreTransformProcessor;
import org.apache.flink.cdc.runtime.operators.transform.PreTransformer;
import org.apache.flink.cdc.runtime.operators.transform.SchemaMetadataTransform;
import org.apache.flink.cdc.runtime.operators.transform.TransformFilter;
import org.apache.flink.cdc.runtime.operators.transform.TransformProjection;
import org.apache.flink.cdc.runtime.operators.transform.TransformRule;
import org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

public class PreTransformOperator
extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event>,
Serializable {
    private static final long serialVersionUID = 1L;
    private final List<TransformRule> transformRules;
    private transient List<PreTransformer> transforms;
    private final Map<TableId, PreTransformChangeInfo> preTransformChangeInfoMap;
    private final List<Tuple2<Selectors, SchemaMetadataTransform>> schemaMetadataTransformers;
    private transient ListState<byte[]> state;
    private final List<Tuple2<String, String>> udfFunctions;
    private List<UserDefinedFunctionDescriptor> udfDescriptors;
    private Map<TableId, PreTransformProcessor> preTransformProcessorMap;
    private Map<TableId, Boolean> hasAsteriskMap;
    private Map<TableId, List<String>> referencedColumnsMap;

    public static Builder newBuilder() {
        return new Builder();
    }

    private PreTransformOperator(List<TransformRule> transformRules, List<Tuple2<String, String>> udfFunctions) {
        this.transformRules = transformRules;
        this.preTransformChangeInfoMap = new ConcurrentHashMap<TableId, PreTransformChangeInfo>();
        this.preTransformProcessorMap = new ConcurrentHashMap<TableId, PreTransformProcessor>();
        this.schemaMetadataTransformers = new ArrayList<Tuple2<Selectors, SchemaMetadataTransform>>();
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.udfFunctions = udfFunctions;
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Event>> output) {
        super.setup(containingTask, config, output);
        this.udfDescriptors = this.udfFunctions.stream().map(udf -> new UserDefinedFunctionDescriptor((String)udf.f0, (String)udf.f1)).collect(Collectors.toList());
        this.transforms = new ArrayList<PreTransformer>();
        for (TransformRule transformRule : this.transformRules) {
            String tableInclusions = transformRule.getTableInclusions();
            String projection = transformRule.getProjection();
            String filter = transformRule.getFilter();
            String primaryKeys = transformRule.getPrimaryKey();
            String partitionKeys = transformRule.getPartitionKey();
            String tableOptions = transformRule.getTableOption();
            Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
            this.transforms.add(new PreTransformer(selectors, TransformProjection.of(projection).orElse(null), TransformFilter.of(filter, this.udfDescriptors).orElse(null)));
            this.schemaMetadataTransformers.add((Tuple2<Selectors, SchemaMetadataTransform>)new Tuple2((Object)selectors, (Object)new SchemaMetadataTransform(primaryKeys, partitionKeys, tableOptions)));
        }
        this.preTransformProcessorMap = new ConcurrentHashMap<TableId, PreTransformProcessor>();
        this.hasAsteriskMap = new ConcurrentHashMap<TableId, Boolean>();
        this.referencedColumnsMap = new ConcurrentHashMap<TableId, List<String>>();
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        OperatorStateStore stateStore = context.getOperatorStateStore();
        ListStateDescriptor descriptor = new ListStateDescriptor("originalSchemaState", byte[].class);
        this.state = stateStore.getUnionListState(descriptor);
        if (context.isRestored()) {
            for (byte[] serializedTableInfo : (Iterable)this.state.get()) {
                PreTransformChangeInfo stateTableChangeInfo = PreTransformChangeInfo.SERIALIZER.deserialize(PreTransformChangeInfo.SERIALIZER.getVersion(), serializedTableInfo);
                this.preTransformChangeInfoMap.put(stateTableChangeInfo.getTableId(), stateTableChangeInfo);
                CreateTableEvent restoredCreateTableEvent = new CreateTableEvent(stateTableChangeInfo.getTableId(), stateTableChangeInfo.getPreTransformedSchema());
                this.cacheTransformRuleInfo(restoredCreateTableEvent);
                this.output.collect((Object)new StreamRecord((Object)restoredCreateTableEvent));
            }
        }
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.state.update(new ArrayList(this.preTransformChangeInfoMap.values().stream().map(tableChangeInfo -> {
            try {
                return PreTransformChangeInfo.SERIALIZER.serialize((PreTransformChangeInfo)tableChangeInfo);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList())));
    }

    public void finish() throws Exception {
        super.finish();
        this.clearOperator();
    }

    public void close() throws Exception {
        super.close();
        this.clearOperator();
        this.state = null;
    }

    public void processElement(StreamRecord<Event> element) throws Exception {
        Event event = (Event)element.getValue();
        if (event instanceof CreateTableEvent) {
            CreateTableEvent createTableEvent = (CreateTableEvent)event;
            this.preTransformProcessorMap.remove(createTableEvent.tableId());
            this.output.collect((Object)new StreamRecord((Object)this.cacheCreateTable(createTableEvent)));
        } else if (event instanceof SchemaChangeEvent) {
            SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent)event;
            this.preTransformProcessorMap.remove(schemaChangeEvent.tableId());
            this.cacheChangeSchema(schemaChangeEvent).ifPresent(e -> this.output.collect((Object)new StreamRecord(e)));
        } else if (event instanceof DataChangeEvent) {
            this.output.collect((Object)new StreamRecord((Object)this.processDataChangeEvent((DataChangeEvent)event)));
        }
    }

    private SchemaChangeEvent cacheCreateTable(CreateTableEvent event) {
        TableId tableId = event.tableId();
        Schema originalSchema = event.getSchema();
        event = this.transformCreateTableEvent(event);
        Schema newSchema = event.getSchema();
        this.preTransformChangeInfoMap.put(tableId, PreTransformChangeInfo.of(tableId, originalSchema, newSchema));
        return event;
    }

    private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent event) {
        TableId tableId = event.tableId();
        PreTransformChangeInfo tableChangeInfo = this.preTransformChangeInfoMap.get(tableId);
        Schema originalSchema = SchemaUtils.applySchemaChangeEvent((Schema)tableChangeInfo.getSourceSchema(), (SchemaChangeEvent)event);
        Schema preTransformedSchema = tableChangeInfo.getPreTransformedSchema();
        Optional schemaChangeEvent = SchemaUtils.transformSchemaChangeEvent((boolean)this.hasAsteriskMap.get(tableId), this.referencedColumnsMap.get(tableId), (SchemaChangeEvent)event);
        if (schemaChangeEvent.isPresent()) {
            preTransformedSchema = SchemaUtils.applySchemaChangeEvent((Schema)tableChangeInfo.getPreTransformedSchema(), (SchemaChangeEvent)((SchemaChangeEvent)schemaChangeEvent.get()));
        }
        this.preTransformChangeInfoMap.put(tableId, PreTransformChangeInfo.of(tableId, originalSchema, preTransformedSchema));
        return schemaChangeEvent;
    }

    private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) {
        TableId tableId = createTableEvent.tableId();
        Set referencedColumnsSet = this.transforms.stream().filter(t -> t.getSelectors().isMatch(tableId)).flatMap(rule -> TransformParser.generateReferencedColumns(rule.getProjection().map(TransformProjection::getProjection).orElse(null), rule.getFilter().map(TransformFilter::getExpression).orElse(null), createTableEvent.getSchema().getColumns()).stream()).map(Column::getName).collect(Collectors.toSet());
        boolean notTransformed = this.transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId));
        if (notTransformed) {
            this.hasAsteriskMap.put(tableId, true);
        } else {
            boolean hasAsterisk = this.transforms.stream().filter(t -> t.getSelectors().isMatch(tableId)).anyMatch(t -> TransformParser.hasAsterisk(t.getProjection().map(TransformProjection::getProjection).orElse(null)));
            this.hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
        }
        this.referencedColumnsMap.put(createTableEvent.tableId(), createTableEvent.getSchema().getColumnNames().stream().filter(referencedColumnsSet::contains).collect(Collectors.toList()));
    }

    private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableEvent) {
        Selectors selectors;
        TableId tableId = createTableEvent.tableId();
        PreTransformChangeInfo tableChangeInfo = this.preTransformChangeInfoMap.get(tableId);
        this.cacheTransformRuleInfo(createTableEvent);
        for (Tuple2<Selectors, SchemaMetadataTransform> tuple2 : this.schemaMetadataTransformers) {
            selectors = (Selectors)tuple2.f0;
            if (!selectors.isMatch(tableId)) continue;
            createTableEvent = new CreateTableEvent(tableId, this.transformSchemaMetaData(createTableEvent.getSchema(), (SchemaMetadataTransform)tuple2.f1));
        }
        for (PreTransformer preTransformer : this.transforms) {
            selectors = preTransformer.getSelectors();
            if (!selectors.isMatch(tableId) || !preTransformer.getProjection().isPresent()) continue;
            TransformProjection transformProjection = preTransformer.getProjection().get();
            TransformFilter transformFilter = preTransformer.getFilter().orElse(null);
            if (!transformProjection.isValid()) continue;
            if (!this.preTransformProcessorMap.containsKey(tableId)) {
                this.preTransformProcessorMap.put(tableId, new PreTransformProcessor(tableChangeInfo, transformProjection, transformFilter));
            }
            PreTransformProcessor preTransformProcessor = this.preTransformProcessorMap.get(tableId);
            return preTransformProcessor.preTransformCreateTableEvent(createTableEvent);
        }
        return createTableEvent;
    }

    private Schema transformSchemaMetaData(Schema schema, SchemaMetadataTransform schemaMetadataTransform) {
        Schema.Builder schemaBuilder = Schema.newBuilder().setColumns(schema.getColumns());
        if (!schemaMetadataTransform.getPrimaryKeys().isEmpty()) {
            schemaBuilder.primaryKey(schemaMetadataTransform.getPrimaryKeys());
        } else {
            schemaBuilder.primaryKey(schema.primaryKeys());
        }
        if (!schemaMetadataTransform.getPartitionKeys().isEmpty()) {
            schemaBuilder.partitionKey(schemaMetadataTransform.getPartitionKeys());
        } else {
            schemaBuilder.partitionKey(schema.partitionKeys());
        }
        if (!schemaMetadataTransform.getOptions().isEmpty()) {
            schemaBuilder.options(schemaMetadataTransform.getOptions());
        } else {
            schemaBuilder.options(schema.options());
        }
        return schemaBuilder.build();
    }

    private DataChangeEvent processDataChangeEvent(DataChangeEvent dataChangeEvent) throws Exception {
        TableId tableId = dataChangeEvent.tableId();
        for (PreTransformer transform : this.transforms) {
            Selectors selectors = transform.getSelectors();
            if (!selectors.isMatch(tableId) || !transform.getProjection().isPresent()) continue;
            TransformProjection transformProjection = transform.getProjection().get();
            TransformFilter transformFilter = transform.getFilter().orElse(null);
            if (!transformProjection.isValid()) continue;
            return this.processProjection(transformProjection, transformFilter, dataChangeEvent);
        }
        return dataChangeEvent;
    }

    private DataChangeEvent processProjection(TransformProjection transformProjection, @Nullable TransformFilter transformFilter, DataChangeEvent dataChangeEvent) {
        TableId tableId = dataChangeEvent.tableId();
        PreTransformChangeInfo tableChangeInfo = this.preTransformChangeInfoMap.get(tableId);
        if (!this.preTransformProcessorMap.containsKey(tableId) || !this.preTransformProcessorMap.get(tableId).hasTableChangeInfo()) {
            this.preTransformProcessorMap.put(tableId, new PreTransformProcessor(tableChangeInfo, transformProjection, transformFilter));
        }
        PreTransformProcessor preTransformProcessor = this.preTransformProcessorMap.get(tableId);
        BinaryRecordData before = (BinaryRecordData)dataChangeEvent.before();
        BinaryRecordData after = (BinaryRecordData)dataChangeEvent.after();
        if (before != null) {
            BinaryRecordData projectedBefore = preTransformProcessor.processFillDataField(before);
            dataChangeEvent = DataChangeEvent.projectBefore((DataChangeEvent)dataChangeEvent, (RecordData)projectedBefore);
        }
        if (after != null) {
            BinaryRecordData projectedAfter = preTransformProcessor.processFillDataField(after);
            dataChangeEvent = DataChangeEvent.projectAfter((DataChangeEvent)dataChangeEvent, (RecordData)projectedAfter);
        }
        return dataChangeEvent;
    }

    private void clearOperator() {
        this.transforms = null;
        this.preTransformProcessorMap = null;
    }

    public static class Builder {
        private final List<TransformRule> transformRules = new ArrayList<TransformRule>();
        private final List<Tuple2<String, String>> udfFunctions = new ArrayList<Tuple2<String, String>>();

        public Builder addTransform(String tableInclusions, @Nullable String projection, @Nullable String filter) {
            this.transformRules.add(new TransformRule(tableInclusions, projection, filter, "", "", ""));
            return this;
        }

        public Builder addTransform(String tableInclusions, @Nullable String projection, @Nullable String filter, String primaryKey, String partitionKey, String tableOption) {
            this.transformRules.add(new TransformRule(tableInclusions, projection, filter, primaryKey, partitionKey, tableOption));
            return this;
        }

        public Builder addUdfFunctions(List<Tuple2<String, String>> udfFunctions) {
            this.udfFunctions.addAll(udfFunctions);
            return this;
        }

        public PreTransformOperator build() {
            return new PreTransformOperator(this.transformRules, this.udfFunctions);
        }
    }
}

