/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.composer.flink.translator;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.composer.definition.RouteDef;
import org.apache.flink.cdc.runtime.operators.schema.regular.BatchSchemaOperator;
import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperatorFactory;
import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

@Internal
public class SchemaOperatorTranslator {
    private final SchemaChangeBehavior schemaChangeBehavior;
    private final String schemaOperatorUid;
    private final Duration rpcTimeOut;
    private final String timezone;

    public SchemaOperatorTranslator(SchemaChangeBehavior schemaChangeBehavior, String schemaOperatorUid, Duration rpcTimeOut, String timezone) {
        this.schemaChangeBehavior = schemaChangeBehavior;
        this.schemaOperatorUid = schemaOperatorUid;
        this.rpcTimeOut = rpcTimeOut;
        this.timezone = timezone;
    }

    public DataStream<Event> translateRegular(DataStream<Event> input, int parallelism, MetadataApplier metadataApplier, List<RouteDef> routes) {
        return this.translateRegular(input, parallelism, false, metadataApplier, routes);
    }

    public DataStream<Event> translateRegular(DataStream<Event> input, int parallelism, boolean isBatchMode, MetadataApplier metadataApplier, List<RouteDef> routes) {
        return isBatchMode ? this.addRegularSchemaBatchOperator(input, parallelism, metadataApplier, routes, this.timezone) : this.addRegularSchemaOperator(input, parallelism, metadataApplier, routes, this.schemaChangeBehavior, this.timezone);
    }

    public DataStream<Event> translateDistributed(DataStream<PartitioningEvent> input, int parallelism, MetadataApplier metadataApplier, List<RouteDef> routes) {
        return this.addDistributedSchemaOperator(input, parallelism, metadataApplier, routes, this.schemaChangeBehavior, this.timezone);
    }

    @Deprecated
    public String getSchemaOperatorUid() {
        return this.schemaOperatorUid;
    }

    private DataStream<Event> addRegularSchemaOperator(DataStream<Event> input, int parallelism, MetadataApplier metadataApplier, List<RouteDef> routes, SchemaChangeBehavior schemaChangeBehavior, String timezone) {
        ArrayList<RouteRule> routingRules = new ArrayList<RouteRule>();
        for (RouteDef route : routes) {
            routingRules.add(new RouteRule(route.getSourceTable(), route.getSinkTable(), (String)route.getReplaceSymbol().orElse(null)));
        }
        SingleOutputStreamOperator stream = input.transform("SchemaOperator", (TypeInformation)new EventTypeInfo(), (OneInputStreamOperatorFactory)new SchemaOperatorFactory(metadataApplier, routingRules, this.rpcTimeOut, schemaChangeBehavior, timezone));
        stream.uid(this.schemaOperatorUid).setParallelism(parallelism);
        return stream;
    }

    private DataStream<Event> addRegularSchemaBatchOperator(DataStream<Event> input, int parallelism, MetadataApplier metadataApplier, List<RouteDef> routes, String timezone) {
        ArrayList<RouteRule> routingRules = new ArrayList<RouteRule>();
        for (RouteDef route : routes) {
            routingRules.add(new RouteRule(route.getSourceTable(), route.getSinkTable(), (String)route.getReplaceSymbol().orElse(null)));
        }
        SingleOutputStreamOperator stream = input.transform("SchemaBatchOperator", (TypeInformation)new EventTypeInfo(), (OneInputStreamOperator)new BatchSchemaOperator(routingRules, metadataApplier, timezone));
        stream.uid(this.schemaOperatorUid).setParallelism(parallelism);
        return stream;
    }

    private DataStream<Event> addDistributedSchemaOperator(DataStream<PartitioningEvent> input, int parallelism, MetadataApplier metadataApplier, List<RouteDef> routes, SchemaChangeBehavior schemaChangeBehavior, String timezone) {
        Preconditions.checkArgument((schemaChangeBehavior == SchemaChangeBehavior.LENIENT || schemaChangeBehavior == SchemaChangeBehavior.IGNORE || schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION ? 1 : 0) != 0, (String)"Schema change behavior %s is not supported because you're trying to compose a pipeline with distributed topology, where data records from different partitions needs to be combined together.\nUse `LENIENT` mode to evolve downstream schema while keep all upstream data fields intact.\nUse `IGNORE` to get a static schema view and ignore any upstream schema changes.\nUse `EXCEPTION` to report error immediately as upstream schema changes are unacceptable.", (Object[])new Object[]{schemaChangeBehavior});
        ArrayList<RouteRule> routingRules = new ArrayList<RouteRule>();
        for (RouteDef route : routes) {
            routingRules.add(new RouteRule(route.getSourceTable(), route.getSinkTable(), (String)route.getReplaceSymbol().orElse(null)));
        }
        return input.transform("SchemaMapper", (TypeInformation)new EventTypeInfo(), (OneInputStreamOperatorFactory)new org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaOperatorFactory(metadataApplier, routingRules, this.rpcTimeOut, schemaChangeBehavior, timezone)).uid(this.schemaOperatorUid).setParallelism(parallelism);
    }
}

