/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.transform;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.transform.SchemaMetadataTransform;
import org.apache.flink.cdc.runtime.operators.transform.TableChangeInfo;
import org.apache.flink.cdc.runtime.operators.transform.TransformProjection;
import org.apache.flink.cdc.runtime.operators.transform.TransformProjectionProcessor;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

public class TransformSchemaOperator
extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event> {
    private final List<Tuple5<String, String, String, String, String>> transformRules;
    private transient List<Tuple2<Selectors, Optional<TransformProjection>>> transforms;
    private final Map<TableId, TableChangeInfo> tableChangeInfoMap;
    private transient Map<TableId, TransformProjectionProcessor> processorMap;
    private final List<Tuple2<Selectors, SchemaMetadataTransform>> schemaMetadataTransformers;
    private transient ListState<byte[]> state;

    public static Builder newBuilder() {
        return new Builder();
    }

    private TransformSchemaOperator(List<Tuple5<String, String, String, String, String>> transformRules) {
        this.transformRules = transformRules;
        this.tableChangeInfoMap = new ConcurrentHashMap<TableId, TableChangeInfo>();
        this.processorMap = new ConcurrentHashMap<TableId, TransformProjectionProcessor>();
        this.schemaMetadataTransformers = new ArrayList<Tuple2<Selectors, SchemaMetadataTransform>>();
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void open() throws Exception {
        super.open();
        this.transforms = new ArrayList<Tuple2<Selectors, Optional<TransformProjection>>>();
        for (Tuple5<String, String, String, String, String> transformRule : this.transformRules) {
            String tableInclusions = (String)transformRule.f0;
            String projection = (String)transformRule.f1;
            String primaryKeys = (String)transformRule.f2;
            String partitionKeys = (String)transformRule.f3;
            String tableOptions = (String)transformRule.f4;
            Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
            this.transforms.add((Tuple2<Selectors, Optional<TransformProjection>>)new Tuple2((Object)selectors, TransformProjection.of(projection)));
            this.schemaMetadataTransformers.add((Tuple2<Selectors, SchemaMetadataTransform>)new Tuple2((Object)selectors, (Object)new SchemaMetadataTransform(primaryKeys, partitionKeys, tableOptions)));
        }
        this.processorMap = new ConcurrentHashMap<TableId, TransformProjectionProcessor>();
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        OperatorStateStore stateStore = context.getOperatorStateStore();
        ListStateDescriptor descriptor = new ListStateDescriptor("originalSchemaState", byte[].class);
        this.state = stateStore.getUnionListState(descriptor);
        if (context.isRestored()) {
            for (byte[] serializedTableInfo : (Iterable)this.state.get()) {
                TableChangeInfo stateTableChangeInfo = TableChangeInfo.SERIALIZER.deserialize(TableChangeInfo.SERIALIZER.getVersion(), serializedTableInfo);
                this.tableChangeInfoMap.put(stateTableChangeInfo.getTableId(), stateTableChangeInfo);
            }
        }
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.state.update(new ArrayList(this.tableChangeInfoMap.values().stream().map(tableChangeInfo -> {
            try {
                return TableChangeInfo.SERIALIZER.serialize((TableChangeInfo)tableChangeInfo);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList())));
    }

    public void finish() throws Exception {
        super.finish();
        this.clearOperator();
    }

    public void close() throws Exception {
        super.close();
        this.clearOperator();
    }

    public void processElement(StreamRecord<Event> element) throws Exception {
        Event event = (Event)element.getValue();
        if (event instanceof CreateTableEvent) {
            event = this.cacheCreateTable((CreateTableEvent)event);
            this.output.collect((Object)new StreamRecord((Object)event));
        } else if (event instanceof SchemaChangeEvent) {
            event = this.cacheChangeSchema((SchemaChangeEvent)event);
            this.output.collect((Object)new StreamRecord((Object)event));
        } else if (event instanceof DataChangeEvent) {
            this.output.collect((Object)new StreamRecord((Object)this.processDataChangeEvent((DataChangeEvent)event)));
        }
    }

    private SchemaChangeEvent cacheCreateTable(CreateTableEvent event) {
        TableId tableId = event.tableId();
        Schema originalSchema = event.getSchema();
        event = this.transformCreateTableEvent(event);
        Schema newSchema = event.getSchema();
        this.tableChangeInfoMap.put(tableId, TableChangeInfo.of(tableId, originalSchema, newSchema));
        return event;
    }

    private SchemaChangeEvent cacheChangeSchema(SchemaChangeEvent event) {
        TableId tableId = event.tableId();
        TableChangeInfo tableChangeInfo = this.tableChangeInfoMap.get(tableId);
        Schema originalSchema = SchemaUtils.applySchemaChangeEvent((Schema)tableChangeInfo.getOriginalSchema(), (SchemaChangeEvent)event);
        Schema newSchema = SchemaUtils.applySchemaChangeEvent((Schema)tableChangeInfo.getTransformedSchema(), (SchemaChangeEvent)event);
        this.tableChangeInfoMap.put(tableId, TableChangeInfo.of(tableId, originalSchema, newSchema));
        return event;
    }

    private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableEvent) {
        Selectors selectors;
        TableId tableId = createTableEvent.tableId();
        for (Tuple2<Selectors, SchemaMetadataTransform> tuple2 : this.schemaMetadataTransformers) {
            selectors = (Selectors)tuple2.f0;
            if (!selectors.isMatch(tableId)) continue;
            createTableEvent = new CreateTableEvent(tableId, this.transformSchemaMetaData(createTableEvent.getSchema(), (SchemaMetadataTransform)tuple2.f1));
        }
        for (Tuple2 tuple2 : this.transforms) {
            TransformProjection transformProjection;
            selectors = (Selectors)tuple2.f0;
            if (!selectors.isMatch(tableId) || !((Optional)tuple2.f1).isPresent() || !(transformProjection = (TransformProjection)((Optional)tuple2.f1).get()).isValid()) continue;
            if (!this.processorMap.containsKey(tableId)) {
                this.processorMap.put(tableId, TransformProjectionProcessor.of(transformProjection));
            }
            TransformProjectionProcessor transformProjectionProcessor = this.processorMap.get(tableId);
            return transformProjectionProcessor.processCreateTableEvent(createTableEvent);
        }
        return createTableEvent;
    }

    private Schema transformSchemaMetaData(Schema schema, SchemaMetadataTransform schemaMetadataTransform) {
        Schema.Builder schemaBuilder = Schema.newBuilder().setColumns(schema.getColumns());
        if (!schemaMetadataTransform.getPrimaryKeys().isEmpty()) {
            schemaBuilder.primaryKey(schemaMetadataTransform.getPrimaryKeys());
        } else {
            schemaBuilder.primaryKey(schema.primaryKeys());
        }
        if (!schemaMetadataTransform.getPartitionKeys().isEmpty()) {
            schemaBuilder.partitionKey(schemaMetadataTransform.getPartitionKeys());
        } else {
            schemaBuilder.partitionKey(schema.partitionKeys());
        }
        if (!schemaMetadataTransform.getOptions().isEmpty()) {
            schemaBuilder.options(schemaMetadataTransform.getOptions());
        } else {
            schemaBuilder.options(schema.options());
        }
        return schemaBuilder.build();
    }

    private DataChangeEvent processDataChangeEvent(DataChangeEvent dataChangeEvent) throws Exception {
        TableId tableId = dataChangeEvent.tableId();
        for (Tuple2<Selectors, Optional<TransformProjection>> transform : this.transforms) {
            TransformProjection transformProjection;
            Selectors selectors = (Selectors)transform.f0;
            if (!selectors.isMatch(tableId) || !((Optional)transform.f1).isPresent() || !(transformProjection = (TransformProjection)((Optional)transform.f1).get()).isValid()) continue;
            return this.processProjection(transformProjection, dataChangeEvent);
        }
        return dataChangeEvent;
    }

    private DataChangeEvent processProjection(TransformProjection transformProjection, DataChangeEvent dataChangeEvent) throws Exception {
        TableId tableId = dataChangeEvent.tableId();
        TableChangeInfo tableChangeInfo = this.tableChangeInfoMap.get(tableId);
        if (!this.processorMap.containsKey(tableId) || !this.processorMap.get(tableId).hasTableChangeInfo()) {
            this.processorMap.put(tableId, TransformProjectionProcessor.of(tableChangeInfo, transformProjection));
        }
        TransformProjectionProcessor transformProjectionProcessor = this.processorMap.get(tableId);
        BinaryRecordData before = (BinaryRecordData)dataChangeEvent.before();
        BinaryRecordData after = (BinaryRecordData)dataChangeEvent.after();
        if (before != null) {
            BinaryRecordData projectedBefore = transformProjectionProcessor.processFillDataField(before);
            dataChangeEvent = DataChangeEvent.projectBefore((DataChangeEvent)dataChangeEvent, (RecordData)projectedBefore);
        }
        if (after != null) {
            BinaryRecordData projectedAfter = transformProjectionProcessor.processFillDataField(after);
            dataChangeEvent = DataChangeEvent.projectAfter((DataChangeEvent)dataChangeEvent, (RecordData)projectedAfter);
        }
        return dataChangeEvent;
    }

    private void clearOperator() {
        this.transforms = null;
        this.processorMap = null;
        this.state = null;
    }

    public static class Builder {
        private final List<Tuple5<String, String, String, String, String>> transformRules = new ArrayList<Tuple5<String, String, String, String, String>>();

        public Builder addTransform(String tableInclusions, @Nullable String projection, String primaryKey, String partitionKey, String tableOption) {
            this.transformRules.add((Tuple5<String, String, String, String, String>)Tuple5.of((Object)tableInclusions, (Object)projection, (Object)primaryKey, (Object)partitionKey, (Object)tableOption));
            return this;
        }

        public TransformSchemaOperator build() {
            return new TransformSchemaOperator(this.transformRules);
        }
    }
}

