/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.schema.regular;

import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
import org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class SchemaOperator
extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event>,
Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class);
    private final String timezone;
    private final Duration rpcTimeout;
    private final SchemaChangeBehavior schemaChangeBehavior;
    private final List<RouteRule> routingRules;
    private transient int subTaskId;
    private transient TaskOperatorEventGateway toCoordinator;
    private transient SchemaOperatorMetrics schemaOperatorMetrics;
    private volatile transient Map<TableId, Schema> originalSchemaMap;
    private volatile transient Map<TableId, Schema> evolvedSchemaMap;
    private transient TableIdRouter router;
    private transient SchemaDerivator derivator;

    @VisibleForTesting
    public SchemaOperator(List<RouteRule> routingRules) {
        this(routingRules, PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
    }

    @VisibleForTesting
    public SchemaOperator(List<RouteRule> routingRules, Duration rpcTimeOut) {
        this(routingRules, rpcTimeOut, SchemaChangeBehavior.EVOLVE);
    }

    @VisibleForTesting
    public SchemaOperator(List<RouteRule> routingRules, Duration rpcTimeOut, SchemaChangeBehavior schemaChangeBehavior) {
        this(routingRules, rpcTimeOut, schemaChangeBehavior, "UTC");
    }

    public SchemaOperator(List<RouteRule> routingRules, Duration rpcTimeOut, SchemaChangeBehavior schemaChangeBehavior, String timezone) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.rpcTimeout = rpcTimeOut;
        this.schemaChangeBehavior = schemaChangeBehavior;
        this.timezone = timezone;
        this.routingRules = routingRules;
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Event>> output) {
        super.setup(containingTask, config, output);
        this.toCoordinator = containingTask.getEnvironment().getOperatorCoordinatorEventGateway();
    }

    public void open() throws Exception {
        super.open();
        this.schemaOperatorMetrics = new SchemaOperatorMetrics((MetricGroup)this.getRuntimeContext().getMetricGroup(), this.schemaChangeBehavior);
        this.subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
        this.originalSchemaMap = new HashMap<TableId, Schema>();
        this.evolvedSchemaMap = new HashMap<TableId, Schema>();
        this.router = new TableIdRouter(this.routingRules);
        this.derivator = new SchemaDerivator();
    }

    public void processElement(StreamRecord<Event> streamRecord) throws Exception {
        Event event = (Event)streamRecord.getValue();
        if (event instanceof SchemaChangeEvent) {
            this.handleSchemaChangeEvent((SchemaChangeEvent)event);
        } else if (event instanceof DataChangeEvent) {
            this.handleDataChangeEvent((DataChangeEvent)event);
        } else {
            throw new RuntimeException("Unknown event type in Stream record: " + event);
        }
    }

    private void handleSchemaChangeEvent(SchemaChangeEvent originalEvent) throws Exception {
        TableId tableId = originalEvent.tableId();
        this.originalSchemaMap.compute(tableId, (tId, schema) -> SchemaUtils.applySchemaChangeEvent(schema, originalEvent));
        this.schemaOperatorMetrics.increaseSchemaChangeEvents(1L);
        List<TableId> sinkTables = this.router.route(tableId);
        LOG.info("{}> Sending the FlushEvent.", (Object)this.subTaskId);
        this.output.collect((Object)new StreamRecord((Object)new FlushEvent(this.subTaskId, sinkTables, originalEvent.getType())));
        SchemaChangeResponse response = this.requestSchemaChange(tableId, originalEvent);
        if (response.isSuccess()) {
            LOG.info("{}> Successfully requested schema change.", (Object)this.subTaskId);
            LOG.info("{}> Finished schema change events: {}", (Object)this.subTaskId, (Object)response.getAppliedSchemaChangeEvents());
            LOG.info("{}> Refreshed evolved schemas: {}", (Object)this.subTaskId, (Object)response.getEvolvedSchemas());
            List<SchemaChangeEvent> finishedSchemaChangeEvents = response.getAppliedSchemaChangeEvents();
            this.evolvedSchemaMap.putAll(response.getEvolvedSchemas());
            for (SchemaChangeEvent finishedEvent : finishedSchemaChangeEvents) {
                this.output.collect((Object)new StreamRecord((Object)finishedEvent));
            }
            this.schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(finishedSchemaChangeEvents.size());
        } else if (response.isDuplicate()) {
            LOG.info("{}> Schema change event {} has been handled in another subTask already.", (Object)this.subTaskId, (Object)originalEvent);
            this.schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(1L);
        } else if (response.isIgnored()) {
            LOG.info("{}> Schema change event {} has been ignored. No schema evolution needed.", (Object)this.subTaskId, (Object)originalEvent);
            this.schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(1L);
        } else {
            throw new IllegalStateException("Unexpected response status: " + response);
        }
    }

    private void handleDataChangeEvent(DataChangeEvent dataChangeEvent) {
        TableId tableId = dataChangeEvent.tableId();
        Schema originalSchema = this.originalSchemaMap.get(dataChangeEvent.tableId());
        for (TableId sinkTableId : this.router.route(tableId)) {
            Schema evolvedSchema = this.evolvedSchemaMap.get(sinkTableId);
            DataChangeEvent coercedDataRecord = this.derivator.coerceDataRecord(this.timezone, DataChangeEvent.route(dataChangeEvent, sinkTableId), originalSchema, evolvedSchema).orElseThrow(() -> new IllegalStateException(String.format("Unable to coerce data record from %s (schema: %s) to %s (schema: %s)", tableId, originalSchema, sinkTableId, evolvedSchema)));
            this.output.collect((Object)new StreamRecord((Object)coercedDataRecord));
        }
    }

    private SchemaChangeResponse requestSchemaChange(TableId tableId, SchemaChangeEvent schemaChangeEvent) throws InterruptedException, TimeoutException {
        block2: {
            SchemaChangeResponse response;
            long deadline = System.currentTimeMillis() + this.rpcTimeout.toMillis();
            while (true) {
                response = (SchemaChangeResponse)this.sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent, this.subTaskId));
                if (System.currentTimeMillis() >= deadline) break block2;
                if (response.isRegistryBusy()) {
                    LOG.info("{}> Schema Registry is busy now, waiting for next request...", (Object)this.subTaskId);
                    Thread.sleep(1000L);
                    continue;
                }
                if (!response.isWaitingForFlush()) break;
                LOG.info("{}> Schema change event has not collected enough flush success events from writers, waiting...", (Object)this.subTaskId);
                Thread.sleep(1000L);
            }
            return response;
        }
        throw new TimeoutException("Timeout when requesting schema change.");
    }

    private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse> RESPONSE sendRequestToCoordinator(REQUEST request) {
        try {
            CompletableFuture responseFuture = this.toCoordinator.sendRequestToCoordinator(this.getOperatorID(), new SerializedValue(request));
            return (RESPONSE)CoordinationResponseUtils.unwrap((CoordinationResponse)responseFuture.get(this.rpcTimeout.toMillis(), TimeUnit.MILLISECONDS));
        }
        catch (Exception e) {
            throw new IllegalStateException("Failed to send request to coordinator: " + request.toString(), e);
        }
    }

    @VisibleForTesting
    protected int getCurrentTimestamp() {
        return (int)Instant.now().getEpochSecond();
    }

    @VisibleForTesting
    public void registerInitialSchema(TableId tableId, Schema schema) {
        this.originalSchemaMap.put(tableId, schema);
        this.evolvedSchemaMap.put(tableId, schema);
    }
}

