/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.schema.regular;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.SchemaMergingUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry;
import org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchemaCoordinator
extends SchemaRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaCoordinator.class);
    private final ExecutorService schemaChangeThreadPool = Executors.newSingleThreadExecutor();
    private transient ConcurrentHashMap<Integer, Set<Integer>> flushedSinkWriters;
    private transient Map<Integer, Tuple2<SchemaChangeRequest, CompletableFuture<CoordinationResponse>>> pendingRequests;

    public SchemaCoordinator(String operatorName, OperatorCoordinator.Context context, ExecutorService coordinatorExecutor, MetadataApplier metadataApplier, List<RouteRule> routes, SchemaChangeBehavior schemaChangeBehavior, Duration rpcTimeout) {
        super(context, operatorName, coordinatorExecutor, metadataApplier, routes, schemaChangeBehavior, rpcTimeout);
    }

    @Override
    public void start() throws Exception {
        super.start();
        this.flushedSinkWriters = new ConcurrentHashMap();
        this.pendingRequests = new ConcurrentHashMap<Integer, Tuple2<SchemaChangeRequest, CompletableFuture<CoordinationResponse>>>();
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.schemaChangeThreadPool != null && !this.schemaChangeThreadPool.isShutdown()) {
            this.schemaChangeThreadPool.shutdownNow();
        }
    }

    @Override
    protected void snapshot(CompletableFuture<byte[]> resultFuture) throws Exception {
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
             DataOutputStream out = new DataOutputStream(baos);){
            int schemaManagerSerializerVersion = SchemaManager.SERIALIZER.getVersion();
            out.writeInt(schemaManagerSerializerVersion);
            byte[] serializedSchemaManager = SchemaManager.SERIALIZER.serialize(this.schemaManager);
            out.writeInt(serializedSchemaManager.length);
            out.write(serializedSchemaManager);
            out.writeInt(0);
            resultFuture.complete(baos.toByteArray());
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    protected void restore(byte[] checkpointData) throws Exception {
        try (ByteArrayInputStream bais = new ByteArrayInputStream(checkpointData);
             DataInputStream in = new DataInputStream(bais);){
            int schemaManagerSerializerVersion = in.readInt();
            switch (schemaManagerSerializerVersion) {
                case 0: {
                    int length = in.readInt();
                    byte[] serializedSchemaManager = new byte[length];
                    in.readFully(serializedSchemaManager);
                    this.schemaManager = SchemaManager.SERIALIZER.deserialize(schemaManagerSerializerVersion, serializedSchemaManager);
                    return;
                }
                case 1: 
                case 2: {
                    int length = in.readInt();
                    byte[] serializedSchemaManager = new byte[length];
                    in.readFully(serializedSchemaManager);
                    this.schemaManager = SchemaManager.SERIALIZER.deserialize(schemaManagerSerializerVersion, serializedSchemaManager);
                    this.consumeUnusedSchemaDerivationBytes(in);
                    return;
                }
                default: {
                    throw new IOException("Unrecognized serialization version " + schemaManagerSerializerVersion);
                }
            }
        }
    }

    @Override
    protected void handleCustomCoordinationRequest(CoordinationRequest request, CompletableFuture<CoordinationResponse> responseFuture) {
        if (!(request instanceof SchemaChangeRequest)) {
            throw new UnsupportedOperationException("Unknown coordination request type: " + request);
        }
        this.handleSchemaChangeRequest((SchemaChangeRequest)request, responseFuture);
    }

    @Override
    protected void handleFlushSuccessEvent(FlushSuccessEvent event) throws TimeoutException {
        int sinkSubtask = event.getSinkSubTaskId();
        int sourceSubtask = event.getSourceSubTaskId();
        LOG.info("Sink subtask {} succeed flushing from source subTask {}.", (Object)sinkSubtask, (Object)sourceSubtask);
        if (!this.flushedSinkWriters.containsKey(sourceSubtask)) {
            this.flushedSinkWriters.put(sourceSubtask, ConcurrentHashMap.newKeySet());
        }
        this.flushedSinkWriters.get(sourceSubtask).add(sinkSubtask);
        LOG.info("Currently flushed sink writers for source task {} are: {}", (Object)sourceSubtask, (Object)this.flushedSinkWriters.get(sourceSubtask));
        if (this.flushedSinkWriters.get(sourceSubtask).size() >= this.currentParallelism) {
            LOG.info("Source SubTask {} have collected enough flush success event. Will start evolving schema changes...", (Object)sourceSubtask);
            this.flushedSinkWriters.remove(sourceSubtask);
            this.startSchemaChangesEvolve(sourceSubtask);
        }
    }

    @Override
    protected void handleUnrecoverableError(String taskDescription, Throwable t) {
        super.handleUnrecoverableError(taskDescription, t);
        this.pendingRequests.forEach((index, tuple) -> ((CompletableFuture)tuple.f1).completeExceptionally(t));
    }

    public void handleSchemaChangeRequest(SchemaChangeRequest request, CompletableFuture<CoordinationResponse> responseFuture) {
        this.pendingRequests.put(request.getSubTaskId(), (Tuple2<SchemaChangeRequest, CompletableFuture<CoordinationResponse>>)Tuple2.of((Object)request, responseFuture));
    }

    private void startSchemaChangesEvolve(int sourceSubTaskId) {
        this.schemaChangeThreadPool.submit(() -> {
            try {
                this.applySchemaChange(sourceSubTaskId);
            }
            catch (Throwable t) {
                this.failJob("Schema change applying task", new FlinkRuntimeException("Failed to apply schema change event.", t));
                throw t;
            }
        });
    }

    private List<SchemaChangeEvent> deduceEvolvedSchemaChanges(SchemaChangeEvent event) {
        LOG.info("Step 1 - Start deducing evolved schema change for {}", (Object)event);
        TableId originalTableId = event.tableId();
        ArrayList<SchemaChangeEvent> deducedSchemaChangeEvents = new ArrayList<SchemaChangeEvent>();
        Set<TableId> originalTables = this.schemaManager.getAllOriginalTables();
        Set<TableId> affectedEvolvedTables = SchemaDerivator.getAffectedEvolvedTables(this.router, Collections.singleton(originalTableId));
        LOG.info("Step 2 - Affected downstream tables are: {}", (Object)affectedEvolvedTables);
        for (TableId evolvedTableId : affectedEvolvedTables) {
            Schema currentEvolvedSchema = this.schemaManager.getLatestEvolvedSchema(evolvedTableId).orElse(null);
            LOG.info("Step 3.1 - For to-be-evolved table {} with schema {}...", (Object)evolvedTableId, (Object)currentEvolvedSchema);
            Set<TableId> upstreamDependencies = SchemaDerivator.reverseLookupDependingUpstreamTables(this.router, evolvedTableId, originalTables);
            Preconditions.checkArgument(!upstreamDependencies.isEmpty(), "An affected sink table's upstream dependency cannot be empty.", new Object[0]);
            LOG.info("Step 3.2 - upstream dependency tables are: {}", (Object)upstreamDependencies);
            ArrayList<SchemaChangeEvent> rawSchemaChangeEvents = new ArrayList<SchemaChangeEvent>();
            if (upstreamDependencies.size() == 1) {
                SchemaChangeEvent rawEvent = event.copy(evolvedTableId);
                rawSchemaChangeEvents.add(rawEvent);
                LOG.info("Step 3.3 - It's an one-by-one routing and could be forwarded as {}.", (Object)rawEvent);
            } else {
                Set<Schema> toBeMergedSchemas = SchemaDerivator.reverseLookupDependingUpstreamSchemas(this.router, evolvedTableId, this.schemaManager);
                LOG.info("Step 3.3 - Upstream dependency schemas are: {}.", (Object)toBeMergedSchemas);
                Schema mergedSchema = currentEvolvedSchema;
                for (Schema toBeMergedSchema : toBeMergedSchemas) {
                    mergedSchema = SchemaMergingUtils.getLeastCommonSchema(mergedSchema, toBeMergedSchema);
                }
                LOG.info("Step 3.4 - Deduced widest schema is: {}.", (Object)mergedSchema);
                List<SchemaChangeEvent> rawEvents = SchemaMergingUtils.getSchemaDifference(evolvedTableId, currentEvolvedSchema, mergedSchema);
                LOG.info("Step 3.5 - It's an many-to-one routing and causes schema changes: {}.", (Object)rawEvents);
                rawSchemaChangeEvents.addAll(rawEvents);
            }
            List<SchemaChangeEvent> normalizedEvents = SchemaDerivator.normalizeSchemaChangeEvents(currentEvolvedSchema, rawSchemaChangeEvents, this.behavior, this.metadataApplier);
            LOG.info("Step 4 - After being normalized with {} behavior, final schema change events are: {}", (Object)this.behavior, (Object)normalizedEvents);
            deducedSchemaChangeEvents.addAll(normalizedEvents);
        }
        return deducedSchemaChangeEvents;
    }

    private void applySchemaChange(int sourceSubTaskId) {
        try {
            this.loopUntil(() -> this.pendingRequests.containsKey(sourceSubTaskId), () -> LOG.info("SchemaOperator {} has not submitted schema change request yet. Waiting...", (Object)sourceSubTaskId), this.rpcTimeout, Duration.ofMillis(100L));
        }
        catch (TimeoutException e) {
            throw new RuntimeException("Timeout waiting for schema change request from SchemaOperator.", e);
        }
        Tuple2<SchemaChangeRequest, CompletableFuture<CoordinationResponse>> requestBody = this.pendingRequests.get(sourceSubTaskId);
        SchemaChangeRequest request = (SchemaChangeRequest)requestBody.f0;
        CompletableFuture responseFuture = (CompletableFuture)requestBody.f1;
        SchemaChangeEvent originalEvent = request.getSchemaChangeEvent();
        TableId originalTableId = originalEvent.tableId();
        Schema currentUpstreamSchema = this.schemaManager.getLatestOriginalSchema(originalTableId).orElse(null);
        ArrayList<SchemaChangeEvent> deducedSchemaChangeEvents = new ArrayList<SchemaChangeEvent>();
        if (!SchemaUtils.isSchemaChangeEventRedundant(currentUpstreamSchema, originalEvent)) {
            this.schemaManager.applyOriginalSchemaChange(originalEvent);
            deducedSchemaChangeEvents.addAll(this.deduceEvolvedSchemaChanges(originalEvent));
        } else {
            LOG.info("Schema change event {} is redundant for current schema {}, just skip it.", (Object)originalEvent, (Object)currentUpstreamSchema);
        }
        LOG.info("All sink subtask have flushed for table {}. Start to apply schema change request: \n\t{}\nthat extracts to:\n\t{}", request.getTableId().toString(), request, deducedSchemaChangeEvents.stream().map(Object::toString).collect(Collectors.joining("\n\t")));
        if (SchemaChangeBehavior.EXCEPTION.equals((Object)this.behavior) && deducedSchemaChangeEvents.stream().anyMatch(evt -> !(evt instanceof CreateTableEvent))) {
            SchemaChangeEvent unacceptableSchemaChangeEvent = deducedSchemaChangeEvents.stream().filter(evt -> !(evt instanceof CreateTableEvent)).findAny().get();
            throw new SchemaEvolveException(unacceptableSchemaChangeEvent, "Unexpected schema change events occurred in EXCEPTION mode. Job will fail now.");
        }
        ArrayList<SchemaChangeEvent> appliedSchemaChangeEvents = new ArrayList<SchemaChangeEvent>();
        for (SchemaChangeEvent event : deducedSchemaChangeEvents) {
            if (!this.applyAndUpdateEvolvedSchemaChange(event)) continue;
            appliedSchemaChangeEvents.add(event);
        }
        HashMap<TableId, Schema> refreshedEvolvedSchemas = new HashMap<TableId, Schema>();
        for (TableId tableId : this.router.route(originalEvent.tableId())) {
            refreshedEvolvedSchemas.put(tableId, this.schemaManager.getLatestEvolvedSchema(tableId).orElse(null));
        }
        this.pendingRequests.remove(sourceSubTaskId);
        LOG.info("Finished handling schema change request from {}. Pending requests: {}", (Object)sourceSubTaskId, (Object)this.pendingRequests);
        responseFuture.complete(CoordinationResponseUtils.wrap(new SchemaChangeResponse(appliedSchemaChangeEvents, refreshedEvolvedSchemas)));
    }

    private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent schemaChangeEvent) {
        try {
            this.metadataApplier.applySchemaChange(schemaChangeEvent);
            this.schemaManager.applyEvolvedSchemaChange(schemaChangeEvent);
            LOG.info("Successfully applied schema change event {} to external system.", (Object)schemaChangeEvent);
            return true;
        }
        catch (Throwable t) {
            if (this.shouldIgnoreException(t)) {
                LOG.warn("Failed to apply schema change {}, but keeps running in tolerant mode. Caused by: {}", (Object)schemaChangeEvent, (Object)t);
                return false;
            }
            throw t;
        }
    }

    private boolean shouldIgnoreException(Throwable throwable) {
        return throwable instanceof UnsupportedSchemaChangeEventException && SchemaChangeBehavior.TRY_EVOLVE.equals((Object)this.behavior);
    }

    private void consumeUnusedSchemaDerivationBytes(DataInputStream in) throws IOException {
        TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
        int derivationMappingSize = in.readInt();
        HashMap derivationMapping = new HashMap(derivationMappingSize);
        for (int i = 0; i < derivationMappingSize; ++i) {
            TableId routedTableId = tableIdSerializer.deserialize((DataInputView)new DataInputViewStreamWrapper((InputStream)in));
            int numOriginalTables = in.readInt();
            HashSet<TableId> originalTableIds = new HashSet<TableId>(numOriginalTables);
            for (int j = 0; j < numOriginalTables; ++j) {
                TableId originalTableId = tableIdSerializer.deserialize((DataInputView)new DataInputViewStreamWrapper((InputStream)in));
                originalTableIds.add(originalTableId);
            }
            derivationMapping.put(routedTableId, originalTableIds);
        }
    }
}

