/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.schema.regular;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class BatchSchemaOperator
extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event>,
Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(BatchSchemaOperator.class);
    private final String timezone;
    private final List<RouteRule> routingRules;
    private volatile transient Map<TableId, Schema> originalSchemaMap;
    private volatile transient Map<TableId, Schema> evolvedSchemaMap;
    private transient TableIdRouter router;
    private transient SchemaDerivator derivator;
    protected transient SchemaManager schemaManager;
    protected MetadataApplier metadataApplier;
    private boolean alreadyMergedCreateTableTables = false;

    public BatchSchemaOperator(List<RouteRule> routingRules, MetadataApplier metadataApplier, String timezone) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.timezone = timezone;
        this.routingRules = routingRules;
        this.metadataApplier = metadataApplier;
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Event>> output) {
        super.setup(containingTask, config, output);
    }

    public void open() throws Exception {
        super.open();
        this.originalSchemaMap = new HashMap<TableId, Schema>();
        this.evolvedSchemaMap = new HashMap<TableId, Schema>();
        this.router = new TableIdRouter(this.routingRules);
        this.derivator = new SchemaDerivator();
        this.schemaManager = new SchemaManager(SchemaChangeBehavior.IGNORE);
    }

    public void processElement(StreamRecord<Event> streamRecord) throws Exception {
        Event event = (Event)streamRecord.getValue();
        if (event instanceof CreateTableEvent) {
            this.handleCreateTableEvent((CreateTableEvent)event);
        } else if (event instanceof DataChangeEvent) {
            if (!this.alreadyMergedCreateTableTables) {
                this.handleFirstDataChangeEvent();
                this.alreadyMergedCreateTableTables = true;
            }
            this.handleDataChangeEvent((DataChangeEvent)event);
        } else {
            throw new RuntimeException("Unknown event type in Batch record: " + event);
        }
    }

    private void handleCreateTableEvent(CreateTableEvent originalEvent) throws Exception {
        this.originalSchemaMap.put(originalEvent.tableId(), originalEvent.getSchema());
    }

    private void handleFirstDataChangeEvent() {
        List<CreateTableEvent> originalCreateTableEvents = this.originalSchemaMap.entrySet().stream().map(entry -> new CreateTableEvent((TableId)entry.getKey(), (Schema)entry.getValue())).collect(Collectors.toList());
        List<CreateTableEvent> deducedCreateTableEvents = SchemaDerivator.deduceMergedCreateTableEvent(this.router, originalCreateTableEvents);
        deducedCreateTableEvents.forEach(createTableEvent -> {
            this.evolvedSchemaMap.put(createTableEvent.tableId(), createTableEvent.getSchema());
            this.applyAndUpdateEvolvedSchemaChange((SchemaChangeEvent)createTableEvent);
            this.output.collect((Object)new StreamRecord(createTableEvent));
        });
    }

    private void handleDataChangeEvent(DataChangeEvent dataChangeEvent) {
        TableId tableId = dataChangeEvent.tableId();
        Schema originalSchema = this.originalSchemaMap.get(dataChangeEvent.tableId());
        for (TableId sinkTableId : this.router.route(tableId)) {
            Schema evolvedSchema = this.evolvedSchemaMap.get(sinkTableId);
            DataChangeEvent coercedDataRecord = this.derivator.coerceDataRecord(this.timezone, DataChangeEvent.route(dataChangeEvent, sinkTableId), originalSchema, evolvedSchema).orElseThrow(() -> new IllegalStateException(String.format("Unable to coerce data record from %s (schema: %s) to %s (schema: %s)", tableId, originalSchema, sinkTableId, evolvedSchema)));
            this.output.collect((Object)new StreamRecord((Object)coercedDataRecord));
        }
    }

    private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent schemaChangeEvent) {
        this.metadataApplier.applySchemaChange(schemaChangeEvent);
        this.schemaManager.applyEvolvedSchemaChange(schemaChangeEvent);
        LOG.info("Successfully applied schema change event {} to external system.", (Object)schemaChangeEvent);
        return true;
    }
}

