/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.schema.distributed;

import java.io.Serializable;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
import org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics;
import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.shaded.guava31.com.google.common.collect.HashBasedTable;
import org.apache.flink.shaded.guava31.com.google.common.collect.Table;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchemaOperator
extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<PartitioningEvent, Event>,
Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class);
    private final Duration rpcTimeOut;
    private final String timezone;
    private final SchemaChangeBehavior schemaChangeBehavior;
    private final List<RouteRule> routingRules;
    private transient TaskOperatorEventGateway toCoordinator;
    private transient int subTaskId;
    private volatile transient Table<TableId, Integer, Schema> upstreamSchemaTable;
    private volatile transient Map<TableId, Schema> evolvedSchemaMap;
    private transient TableIdRouter tableIdRouter;
    private transient SchemaDerivator derivator;
    private transient SchemaOperatorMetrics schemaOperatorMetrics;

    public SchemaOperator(List<RouteRule> routingRules, Duration rpcTimeOut, SchemaChangeBehavior schemaChangeBehavior, String timezone) {
        this.routingRules = routingRules;
        this.rpcTimeOut = rpcTimeOut;
        this.schemaChangeBehavior = schemaChangeBehavior;
        this.timezone = timezone;
    }

    public void open() throws Exception {
        super.open();
        this.subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
        this.upstreamSchemaTable = HashBasedTable.create();
        this.evolvedSchemaMap = new HashMap<TableId, Schema>();
        this.tableIdRouter = new TableIdRouter(this.routingRules);
        this.derivator = new SchemaDerivator();
        this.schemaOperatorMetrics = new SchemaOperatorMetrics((MetricGroup)this.getRuntimeContext().getMetricGroup(), this.schemaChangeBehavior);
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Event>> output) {
        super.setup(containingTask, config, output);
        this.toCoordinator = containingTask.getEnvironment().getOperatorCoordinatorEventGateway();
    }

    public void processElement(StreamRecord<PartitioningEvent> streamRecord) throws Exception {
        PartitioningEvent partitioningEvent = (PartitioningEvent)streamRecord.getValue();
        Event event = partitioningEvent.getPayload();
        int sourcePartition = partitioningEvent.getSourcePartition();
        if (event instanceof SchemaChangeEvent) {
            this.schemaOperatorMetrics.increaseSchemaChangeEvents(1L);
            SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent)event;
            TableId tableId = schemaChangeEvent.tableId();
            Schema beforeSchema = (Schema)this.upstreamSchemaTable.get((Object)tableId, (Object)sourcePartition);
            Schema afterSchema = SchemaUtils.applySchemaChangeEvent((Schema)beforeSchema, (SchemaChangeEvent)schemaChangeEvent);
            this.upstreamSchemaTable.put((Object)tableId, (Object)sourcePartition, (Object)afterSchema);
            if (!(schemaChangeEvent instanceof CreateTableEvent)) {
                if (this.schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
                    LOG.info("{}> Schema change event {} has been ignored.", (Object)this.subTaskId, (Object)event);
                    this.schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(1L);
                    return;
                }
                if (this.schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION) {
                    throw new SchemaEvolveException(schemaChangeEvent, "Unexpected schema change events occurred in EXCEPTION mode. Job will fail now.");
                }
            }
            this.requestSchemaChange(tableId, new SchemaChangeRequest(sourcePartition, this.subTaskId, schemaChangeEvent));
            this.schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(1L);
        } else if (event instanceof DataChangeEvent) {
            DataChangeEvent dataChangeEvent = (DataChangeEvent)event;
            TableId tableId = dataChangeEvent.tableId();
            Schema upstreamSchema = (Schema)this.upstreamSchemaTable.get((Object)dataChangeEvent.tableId(), (Object)sourcePartition);
            for (TableId sinkTableId : this.tableIdRouter.route(tableId)) {
                Schema evolvedSchema = this.evolvedSchemaMap.get(sinkTableId);
                DataChangeEvent coercedDataRecord = this.derivator.coerceDataRecord(this.timezone, DataChangeEvent.route((DataChangeEvent)dataChangeEvent, (TableId)sinkTableId), upstreamSchema, evolvedSchema).orElseThrow(() -> new IllegalStateException(String.format("Unable to coerce data record from %s (schema: %s) to %s (schema: %s)", tableId, upstreamSchema, sinkTableId, evolvedSchema)));
                this.output.collect((Object)new StreamRecord((Object)coercedDataRecord));
            }
        } else {
            throw new IllegalStateException(this.subTaskId + "> SchemaOperator received an unexpected event: " + event);
        }
    }

    private void requestSchemaChange(TableId sourceTableId, SchemaChangeRequest schemaChangeRequest) {
        LOG.info("{}> Sent FlushEvent to downstream...", (Object)this.subTaskId);
        this.output.collect((Object)new StreamRecord((Object)new FlushEvent(this.subTaskId, this.tableIdRouter.route(sourceTableId), schemaChangeRequest.getSchemaChangeEvent().getType())));
        LOG.info("{}> Sending evolve request...", (Object)this.subTaskId);
        SchemaChangeResponse response = (SchemaChangeResponse)this.sendRequestToCoordinator(schemaChangeRequest);
        LOG.info("{}> Evolve request response: {}", (Object)this.subTaskId, (Object)response);
        this.evolvedSchemaMap.putAll(response.getEvolvedSchemas());
        response.getEvolvedSchemaChangeEvents().forEach(evt -> this.output.collect((Object)new StreamRecord(evt)));
        LOG.info("{}> Successfully updated evolved schema cache. Current state: {}", (Object)this.subTaskId, this.evolvedSchemaMap);
    }

    private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse> RESPONSE sendRequestToCoordinator(REQUEST request) {
        try {
            CompletableFuture responseFuture = this.toCoordinator.sendRequestToCoordinator(this.getOperatorID(), new SerializedValue(request));
            return (RESPONSE)CoordinationResponseUtils.unwrap((CoordinationResponse)responseFuture.get(this.rpcTimeOut.toMillis(), TimeUnit.MILLISECONDS));
        }
        catch (Exception e) {
            throw new IllegalStateException("Failed to send request to coordinator: " + request.toString(), e);
        }
    }
}

