/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.transform;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.transform.PreTransformChangeInfo;
import org.apache.flink.cdc.runtime.operators.transform.PreTransformProcessor;
import org.apache.flink.cdc.runtime.operators.transform.PreTransformer;
import org.apache.flink.cdc.runtime.operators.transform.SchemaMetadataTransform;
import org.apache.flink.cdc.runtime.operators.transform.TransformFilter;
import org.apache.flink.cdc.runtime.operators.transform.TransformProjection;
import org.apache.flink.cdc.runtime.operators.transform.TransformRule;
import org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

public class PreTransformOperator
extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event>,
Serializable {
    private static final long serialVersionUID = 1L;
    private final List<TransformRule> transformRules;
    private transient List<PreTransformer> transforms;
    private final Map<TableId, PreTransformChangeInfo> preTransformChangeInfoMap = new ConcurrentHashMap<TableId, PreTransformChangeInfo>();
    private final List<Tuple2<Selectors, SchemaMetadataTransform>> schemaMetadataTransformers;
    private transient ListState<byte[]> state;
    private final List<Tuple3<String, String, Map<String, String>>> udfFunctions;
    private List<UserDefinedFunctionDescriptor> udfDescriptors;
    private Map<TableId, PreTransformProcessor> preTransformProcessorMap = new ConcurrentHashMap<TableId, PreTransformProcessor>();
    private Map<TableId, Boolean> hasAsteriskMap;
    private final boolean canContainDistributedTables;

    public static Builder newBuilder() {
        return new Builder();
    }

    private PreTransformOperator(List<TransformRule> transformRules, List<Tuple3<String, String, Map<String, String>>> udfFunctions, boolean canContainDistributedTables) {
        this.schemaMetadataTransformers = new ArrayList<Tuple2<Selectors, SchemaMetadataTransform>>();
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.transformRules = transformRules;
        this.udfFunctions = udfFunctions;
        this.canContainDistributedTables = canContainDistributedTables;
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Event>> output) {
        super.setup(containingTask, config, output);
        this.udfDescriptors = this.udfFunctions.stream().map(udf -> new UserDefinedFunctionDescriptor((String)udf.f0, (String)udf.f1, (Map)udf.f2)).collect(Collectors.toList());
        this.transforms = new ArrayList<PreTransformer>();
        for (TransformRule transformRule : this.transformRules) {
            String tableInclusions = transformRule.getTableInclusions();
            String projection = transformRule.getProjection();
            String filter = transformRule.getFilter();
            String primaryKeys = transformRule.getPrimaryKey();
            String partitionKeys = transformRule.getPartitionKey();
            String tableOptions = transformRule.getTableOption();
            Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
            this.transforms.add(new PreTransformer(selectors, TransformProjection.of(projection).orElse(null), TransformFilter.of(filter, this.udfDescriptors).orElse(null)));
            this.schemaMetadataTransformers.add((Tuple2<Selectors, SchemaMetadataTransform>)new Tuple2((Object)selectors, (Object)new SchemaMetadataTransform(primaryKeys, partitionKeys, tableOptions)));
        }
        this.preTransformProcessorMap = new ConcurrentHashMap<TableId, PreTransformProcessor>();
        this.hasAsteriskMap = new ConcurrentHashMap<TableId, Boolean>();
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        if (this.canContainDistributedTables) {
            return;
        }
        OperatorStateStore stateStore = context.getOperatorStateStore();
        ListStateDescriptor descriptor = new ListStateDescriptor("originalSchemaState", byte[].class);
        this.state = stateStore.getUnionListState(descriptor);
        if (context.isRestored()) {
            for (byte[] serializedTableInfo : (Iterable)this.state.get()) {
                PreTransformChangeInfo stateTableChangeInfo = PreTransformChangeInfo.SERIALIZER.deserialize(PreTransformChangeInfo.SERIALIZER.getVersion(), serializedTableInfo);
                this.preTransformChangeInfoMap.put(stateTableChangeInfo.getTableId(), stateTableChangeInfo);
                CreateTableEvent restoredCreateTableEvent = new CreateTableEvent(stateTableChangeInfo.getTableId(), stateTableChangeInfo.getPreTransformedSchema());
                this.cacheTransformRuleInfo(restoredCreateTableEvent);
                this.output.collect((Object)new StreamRecord((Object)restoredCreateTableEvent));
            }
        }
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        if (this.canContainDistributedTables) {
            return;
        }
        this.state.update(new ArrayList(this.preTransformChangeInfoMap.values().stream().map(tableChangeInfo -> {
            try {
                return PreTransformChangeInfo.SERIALIZER.serialize((PreTransformChangeInfo)tableChangeInfo);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList())));
    }

    public void finish() throws Exception {
        super.finish();
        this.clearOperator();
    }

    public void close() throws Exception {
        super.close();
        this.clearOperator();
        this.state = null;
    }

    public void processElement(StreamRecord<Event> element) throws Exception {
        Event event = (Event)element.getValue();
        if (event instanceof CreateTableEvent) {
            CreateTableEvent createTableEvent = (CreateTableEvent)event;
            this.preTransformProcessorMap.remove(createTableEvent.tableId());
            this.output.collect((Object)new StreamRecord((Object)this.cacheCreateTable(createTableEvent)));
        } else if (event instanceof DropTableEvent) {
            this.preTransformProcessorMap.remove(((DropTableEvent)event).tableId());
            this.output.collect((Object)new StreamRecord((Object)event));
        } else if (event instanceof TruncateTableEvent) {
            this.output.collect((Object)new StreamRecord((Object)event));
        } else if (event instanceof SchemaChangeEvent) {
            SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent)event;
            this.preTransformProcessorMap.remove(schemaChangeEvent.tableId());
            this.cacheChangeSchema(schemaChangeEvent).ifPresent(e -> this.output.collect((Object)new StreamRecord(e)));
        } else if (event instanceof DataChangeEvent) {
            this.output.collect((Object)new StreamRecord((Object)this.processDataChangeEvent((DataChangeEvent)event)));
        }
    }

    private SchemaChangeEvent cacheCreateTable(CreateTableEvent event) {
        TableId tableId = event.tableId();
        Schema originalSchema = event.getSchema();
        event = this.transformCreateTableEvent(event);
        Schema newSchema = event.getSchema();
        this.preTransformChangeInfoMap.put(tableId, PreTransformChangeInfo.of(tableId, originalSchema, newSchema));
        return event;
    }

    private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent event) {
        TableId tableId = event.tableId();
        PreTransformChangeInfo tableChangeInfo = this.preTransformChangeInfoMap.get(tableId);
        Schema originalSchema = SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getSourceSchema(), event);
        Schema preTransformedSchema = tableChangeInfo.getPreTransformedSchema();
        Optional<SchemaChangeEvent> schemaChangeEvent = this.hasAsteriskMap.getOrDefault(tableId, true) != false ? SchemaUtils.transformSchemaChangeEvent(true, tableChangeInfo.getSourceSchema().getColumnNames(), event) : SchemaUtils.transformSchemaChangeEvent(false, tableChangeInfo.getPreTransformedSchema().getColumnNames(), event);
        if (schemaChangeEvent.isPresent()) {
            preTransformedSchema = SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getPreTransformedSchema(), schemaChangeEvent.get());
        }
        this.cachePreTransformProcessor(tableId, originalSchema);
        this.preTransformChangeInfoMap.put(tableId, PreTransformChangeInfo.of(tableId, originalSchema, preTransformedSchema));
        return schemaChangeEvent;
    }

    private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) {
        TableId tableId = createTableEvent.tableId();
        boolean notTransformed = this.transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId));
        if (notTransformed) {
            this.hasAsteriskMap.put(tableId, true);
        } else {
            boolean hasAsterisk = this.transforms.stream().filter(t -> t.getSelectors().isMatch(tableId)).anyMatch(t -> TransformParser.hasAsterisk(t.getProjection().map(TransformProjection::getProjection).orElse(null)));
            this.hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
        }
    }

    private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableEvent) {
        TableId tableId = createTableEvent.tableId();
        for (Tuple2<Selectors, SchemaMetadataTransform> transform : this.schemaMetadataTransformers) {
            Selectors selectors = (Selectors)transform.f0;
            if (!selectors.isMatch(tableId)) continue;
            createTableEvent = new CreateTableEvent(tableId, this.transformSchemaMetaData(createTableEvent.getSchema(), (SchemaMetadataTransform)transform.f1));
        }
        this.cachePreTransformProcessor(tableId, createTableEvent.getSchema());
        if (this.preTransformProcessorMap.containsKey(tableId)) {
            return this.preTransformProcessorMap.get(tableId).preTransformCreateTableEvent(createTableEvent);
        }
        return createTableEvent;
    }

    private void cachePreTransformProcessor(TableId tableId, Schema tableSchema) {
        LinkedHashSet<Column> referencedColumnsSet = new LinkedHashSet<Column>();
        boolean hasMatchTransform = false;
        for (PreTransformer transform : this.transforms) {
            if (!transform.getSelectors().isMatch(tableId)) continue;
            this.processProjectionTransform(tableId, tableSchema, referencedColumnsSet, transform);
            hasMatchTransform = true;
        }
        if (!hasMatchTransform) {
            this.processProjectionTransform(tableId, tableSchema, referencedColumnsSet, null);
        }
    }

    public void processProjectionTransform(TableId tableId, Schema tableSchema, LinkedHashSet<Column> referencedColumnsSet, @Nullable PreTransformer transform) {
        if (transform == null) {
            referencedColumnsSet.addAll(tableSchema.getColumns());
            this.hasAsteriskMap.put(tableId, true);
        } else {
            TransformProjection transformProjection = transform.getProjection().get();
            boolean hasAsterisk = TransformParser.hasAsterisk(transformProjection.getProjection());
            if (hasAsterisk) {
                referencedColumnsSet.addAll(tableSchema.getColumns());
                this.hasAsteriskMap.put(tableId, true);
            } else {
                TransformFilter transformFilter = transform.getFilter().orElse(null);
                List<Column> referencedColumns = TransformParser.generateReferencedColumns(transformProjection.getProjection(), transformFilter != null ? transformFilter.getExpression() : null, tableSchema.getColumns());
                referencedColumnsSet.addAll(referencedColumns);
                this.hasAsteriskMap.putIfAbsent(tableId, false);
            }
        }
        PreTransformChangeInfo tableChangeInfo = PreTransformChangeInfo.of(tableId, tableSchema, tableSchema.copy(new ArrayList<Column>(referencedColumnsSet)));
        this.preTransformProcessorMap.put(tableId, new PreTransformProcessor(tableChangeInfo));
    }

    private Schema transformSchemaMetaData(Schema schema, SchemaMetadataTransform schemaMetadataTransform) {
        Schema.Builder schemaBuilder = Schema.newBuilder().setColumns(schema.getColumns());
        if (!schemaMetadataTransform.getPrimaryKeys().isEmpty()) {
            schemaBuilder.primaryKey(schemaMetadataTransform.getPrimaryKeys());
        } else {
            schemaBuilder.primaryKey(schema.primaryKeys());
        }
        if (!schemaMetadataTransform.getPartitionKeys().isEmpty()) {
            schemaBuilder.partitionKey(schemaMetadataTransform.getPartitionKeys());
        } else {
            schemaBuilder.partitionKey(schema.partitionKeys());
        }
        if (!schemaMetadataTransform.getOptions().isEmpty()) {
            schemaBuilder.options(schemaMetadataTransform.getOptions());
        } else {
            schemaBuilder.options(schema.options());
        }
        return schemaBuilder.build();
    }

    private DataChangeEvent processDataChangeEvent(DataChangeEvent dataChangeEvent) {
        if (!this.transforms.isEmpty()) {
            TableId tableId = dataChangeEvent.tableId();
            PreTransformProcessor processor = this.preTransformProcessorMap.get(tableId);
            Preconditions.checkArgument(processor != null, "Transform operator receives a data change event from table %s without a full schema view. This might happen if source with distributed tables doesn't emit CreateTableEvent first after fail-over. This is likely a bug, please consider filing an issue.", tableId);
            BinaryRecordData before = (BinaryRecordData)dataChangeEvent.before();
            BinaryRecordData after = (BinaryRecordData)dataChangeEvent.after();
            if (before != null) {
                BinaryRecordData projectedBefore = processor.processFillDataField(before);
                dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore);
            }
            if (after != null) {
                BinaryRecordData projectedAfter = processor.processFillDataField(after);
                dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter);
            }
        }
        return dataChangeEvent;
    }

    private void clearOperator() {
        this.transforms = null;
        this.preTransformProcessorMap = null;
    }

    public static class Builder {
        private final List<TransformRule> transformRules = new ArrayList<TransformRule>();
        private boolean canContainDistributedTables;
        private final List<Tuple3<String, String, Map<String, String>>> udfFunctions = new ArrayList<Tuple3<String, String, Map<String, String>>>();

        public Builder addTransform(String tableInclusions, @Nullable String projection, @Nullable String filter) {
            this.transformRules.add(new TransformRule(tableInclusions, projection, filter, "", "", "", null, new SupportedMetadataColumn[0]));
            return this;
        }

        public Builder addTransform(String tableInclusions, @Nullable String projection, @Nullable String filter, String primaryKey, String partitionKey, String tableOption, @Nullable String postTransformConverter, SupportedMetadataColumn[] supportedMetadataColumns) {
            this.transformRules.add(new TransformRule(tableInclusions, projection, filter, primaryKey, partitionKey, tableOption, postTransformConverter, supportedMetadataColumns));
            return this;
        }

        public Builder addUdfFunctions(List<Tuple3<String, String, Map<String, String>>> udfFunctions) {
            this.udfFunctions.addAll(udfFunctions);
            return this;
        }

        public Builder canContainDistributedTables(boolean canContainDistributedTables) {
            this.canContainDistributedTables = canContainDistributedTables;
            return this;
        }

        public PreTransformOperator build() {
            return new PreTransformOperator(this.transformRules, this.udfFunctions, this.canContainDistributedTables);
        }
    }
}

