/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.schema;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class SchemaOperator
extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class);
    private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1L);
    private final List<Tuple2<String, TableId>> routingRules;
    private transient List<Tuple2<Selectors, TableId>> routes;
    private transient TaskOperatorEventGateway toCoordinator;
    private transient SchemaEvolutionClient schemaEvolutionClient;
    private transient LoadingCache<TableId, Schema> cachedSchemas;
    private final long rpcTimeOutInMillis;

    public SchemaOperator(List<Tuple2<String, TableId>> routingRules) {
        this.routingRules = routingRules;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.rpcTimeOutInMillis = PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis();
    }

    public SchemaOperator(List<Tuple2<String, TableId>> routingRules, Duration rpcTimeOut) {
        this.routingRules = routingRules;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.rpcTimeOutInMillis = rpcTimeOut.toMillis();
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Event>> output) {
        super.setup(containingTask, config, output);
        this.toCoordinator = containingTask.getEnvironment().getOperatorCoordinatorEventGateway();
        this.routes = this.routingRules.stream().map(tuple2 -> {
            String tableInclusions = (String)tuple2.f0;
            TableId replaceBy = (TableId)tuple2.f1;
            Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
            return new Tuple2((Object)selectors, (Object)replaceBy);
        }).collect(Collectors.toList());
        this.schemaEvolutionClient = new SchemaEvolutionClient(this.toCoordinator, this.getOperatorID());
        this.cachedSchemas = CacheBuilder.newBuilder().expireAfterAccess(CACHE_EXPIRE_DURATION).build((CacheLoader)new CacheLoader<TableId, Schema>(){

            public Schema load(TableId tableId) {
                return SchemaOperator.this.getLatestSchema(tableId);
            }
        });
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        if (context.isRestored() && this.getRuntimeContext().getIndexOfThisSubtask() == 0) {
            this.sendRequestToCoordinator(new RefreshPendingListsRequest());
        }
    }

    public void processElement(StreamRecord<Event> streamRecord) throws InterruptedException, TimeoutException {
        Event event = (Event)streamRecord.getValue();
        if (event instanceof SchemaChangeEvent) {
            TableId tableId = ((SchemaChangeEvent)event).tableId();
            LOG.info("Table {} received SchemaChangeEvent and start to be blocked.", (Object)tableId.toString());
            this.handleSchemaChangeEvent(tableId, (SchemaChangeEvent)event);
            this.cachedSchemas.put((Object)tableId, (Object)this.getLatestSchema(tableId));
            this.getRoutedTable(tableId).ifPresent(routed -> this.cachedSchemas.put(routed, (Object)this.getLatestSchema((TableId)routed)));
            return;
        }
        DataChangeEvent dataChangeEvent = (DataChangeEvent)event;
        TableId tableId = dataChangeEvent.tableId();
        Optional<TableId> optionalRoutedTable = this.getRoutedTable(tableId);
        if (optionalRoutedTable.isPresent()) {
            this.output.collect((Object)new StreamRecord((Object)this.maybeFillInNullForEmptyColumns(dataChangeEvent, optionalRoutedTable.get())));
        } else {
            this.output.collect(streamRecord);
        }
    }

    private DataChangeEvent maybeFillInNullForEmptyColumns(DataChangeEvent originalEvent, TableId routedTableId) {
        try {
            Schema originalSchema = (Schema)this.cachedSchemas.get((Object)originalEvent.tableId());
            Schema routedTableSchema = (Schema)this.cachedSchemas.get((Object)routedTableId);
            if (originalSchema.equals((Object)routedTableSchema)) {
                return ChangeEventUtils.recreateDataChangeEvent((DataChangeEvent)originalEvent, (TableId)routedTableId);
            }
            switch (originalEvent.op()) {
                case INSERT: {
                    return DataChangeEvent.insertEvent((TableId)routedTableId, (RecordData)this.regenerateRecordData(originalEvent.after(), originalSchema, routedTableSchema), (Map)originalEvent.meta());
                }
                case UPDATE: {
                    return DataChangeEvent.updateEvent((TableId)routedTableId, (RecordData)this.regenerateRecordData(originalEvent.before(), originalSchema, routedTableSchema), (RecordData)this.regenerateRecordData(originalEvent.after(), originalSchema, routedTableSchema), (Map)originalEvent.meta());
                }
                case DELETE: {
                    return DataChangeEvent.deleteEvent((TableId)routedTableId, (RecordData)this.regenerateRecordData(originalEvent.before(), originalSchema, routedTableSchema), (Map)originalEvent.meta());
                }
                case REPLACE: {
                    return DataChangeEvent.replaceEvent((TableId)routedTableId, (RecordData)this.regenerateRecordData(originalEvent.after(), originalSchema, routedTableSchema), (Map)originalEvent.meta());
                }
            }
            throw new IllegalArgumentException(String.format("Unrecognized operation type \"%s\"", originalEvent.op()));
        }
        catch (Exception e) {
            throw new IllegalStateException("Unable to fill null for empty columns", e);
        }
    }

    private RecordData regenerateRecordData(RecordData recordData, Schema originalSchema, Schema routedTableSchema) {
        ArrayList<RecordData.FieldGetter> fieldGetters = new ArrayList<RecordData.FieldGetter>();
        for (Column column : routedTableSchema.getColumns()) {
            String columnName = column.getName();
            int columnIndex = originalSchema.getColumnNames().indexOf(columnName);
            if (columnIndex == -1) {
                fieldGetters.add(new NullFieldGetter());
                continue;
            }
            RecordData.FieldGetter fieldGetter2 = RecordData.createFieldGetter((DataType)((Column)originalSchema.getColumn(columnName).get()).getType(), (int)columnIndex);
            if (((Column)originalSchema.getColumn(columnName).get()).getType().equals((Object)column.getType())) {
                fieldGetters.add(fieldGetter2);
                continue;
            }
            fieldGetters.add(new TypeCoercionFieldGetter(column.getType(), fieldGetter2));
        }
        BinaryRecordDataGenerator recordDataGenerator = new BinaryRecordDataGenerator(routedTableSchema.getColumnDataTypes().toArray(new DataType[0]));
        return recordDataGenerator.generate(fieldGetters.stream().map(fieldGetter -> fieldGetter.getFieldOrNull(recordData)).toArray());
    }

    private Optional<TableId> getRoutedTable(TableId originalTableId) {
        for (Tuple2<Selectors, TableId> route : this.routes) {
            if (!((Selectors)route.f0).isMatch(originalTableId)) continue;
            return Optional.of(route.f1);
        }
        return Optional.empty();
    }

    private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) throws InterruptedException, TimeoutException {
        SchemaChangeResponse response = this.requestSchemaChange(tableId, schemaChangeEvent);
        if (!response.getSchemaChangeEvents().isEmpty()) {
            LOG.info("Sending the FlushEvent for table {} in subtask {}.", (Object)tableId, (Object)this.getRuntimeContext().getIndexOfThisSubtask());
            this.output.collect((Object)new StreamRecord((Object)new FlushEvent(tableId)));
            response.getSchemaChangeEvents().forEach(e -> this.output.collect((Object)new StreamRecord(e)));
            this.requestReleaseUpstream();
        }
    }

    private SchemaChangeResponse requestSchemaChange(TableId tableId, SchemaChangeEvent schemaChangeEvent) {
        return (SchemaChangeResponse)this.sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent));
    }

    private void requestReleaseUpstream() throws InterruptedException, TimeoutException {
        Object coordinationResponse = this.sendRequestToCoordinator(new ReleaseUpstreamRequest());
        long nextRpcTimeOutMillis = System.currentTimeMillis() + this.rpcTimeOutInMillis;
        while (coordinationResponse instanceof SchemaChangeProcessingResponse) {
            if (System.currentTimeMillis() < nextRpcTimeOutMillis) {
                Thread.sleep(1000L);
                coordinationResponse = this.sendRequestToCoordinator(new SchemaChangeResultRequest());
                continue;
            }
            throw new TimeoutException("TimeOut when requesting release upstream");
        }
    }

    private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse> RESPONSE sendRequestToCoordinator(REQUEST request) {
        try {
            CompletableFuture responseFuture = this.toCoordinator.sendRequestToCoordinator(this.getOperatorID(), new SerializedValue(request));
            return (RESPONSE)CoordinationResponseUtils.unwrap((CoordinationResponse)responseFuture.get());
        }
        catch (Exception e) {
            throw new IllegalStateException("Failed to send request to coordinator: " + request.toString(), e);
        }
    }

    private Schema getLatestSchema(TableId tableId) {
        try {
            Optional<Schema> optionalSchema = this.schemaEvolutionClient.getLatestSchema(tableId);
            if (!optionalSchema.isPresent()) {
                throw new IllegalStateException(String.format("Schema doesn't exist for table \"%s\"", tableId));
            }
            return optionalSchema.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(String.format("Unable to get latest schema for table \"%s\"", tableId));
        }
    }

    private static class TypeCoercionFieldGetter
    implements RecordData.FieldGetter {
        private final DataType destinationType;
        private final RecordData.FieldGetter originalFieldGetter;

        public TypeCoercionFieldGetter(DataType destinationType, RecordData.FieldGetter originalFieldGetter) {
            this.destinationType = destinationType;
            this.originalFieldGetter = originalFieldGetter;
        }

        @Nullable
        public Object getFieldOrNull(RecordData recordData) {
            Object originalField = this.originalFieldGetter.getFieldOrNull(recordData);
            if (originalField == null) {
                return null;
            }
            if (this.destinationType.is(DataTypeRoot.BIGINT)) {
                if (originalField instanceof Byte) {
                    return ((Byte)originalField).longValue();
                }
                if (originalField instanceof Short) {
                    return ((Short)originalField).longValue();
                }
                if (originalField instanceof Integer) {
                    return ((Integer)originalField).longValue();
                }
                throw new IllegalArgumentException(String.format("Cannot fit type \"%s\" into a BIGINT column. Currently only TINYINT / SMALLINT / INT can be accepted by a BIGINT column", originalField.getClass()));
            }
            if (this.destinationType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
                if (originalField instanceof Float) {
                    return ((Float)originalField).doubleValue();
                }
                throw new IllegalArgumentException(String.format("Cannot fit type \"%s\" into a DOUBLE column. Currently only FLOAT can be accepted by a DOUBLE column", originalField.getClass()));
            }
            if (this.destinationType.is(DataTypeRoot.VARCHAR)) {
                if (originalField instanceof StringData) {
                    return originalField;
                }
                throw new IllegalArgumentException(String.format("Cannot fit type \"%s\" into a STRING column. Currently only CHAR / VARCHAR can be accepted by a STRING column", originalField.getClass()));
            }
            throw new IllegalArgumentException(String.format("Column type \"%s\" doesn't support type coercion", this.destinationType));
        }
    }

    private static class NullFieldGetter
    implements RecordData.FieldGetter {
        private NullFieldGetter() {
        }

        @Nullable
        public Object getFieldOrNull(RecordData recordData) {
            return null;
        }
    }
}

