/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.schema.coordinator;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaDerivation;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler;
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent;
import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.GetOriginalSchemaRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.GetOriginalSchemaResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchemaRegistry
implements OperatorCoordinator,
CoordinationRequestHandler {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistry.class);
    private final OperatorCoordinator.Context context;
    private final String operatorName;
    private final ExecutorService coordinatorExecutor;
    private final Map<Integer, Throwable> failedReasons;
    private final MetadataApplier metadataApplier;
    private final List<RouteRule> routes;
    private SchemaRegistryRequestHandler requestHandler;
    private SchemaManager schemaManager;
    private SchemaDerivation schemaDerivation;
    private SchemaChangeBehavior schemaChangeBehavior;
    private int currentParallelism;

    public SchemaRegistry(String operatorName, OperatorCoordinator.Context context, ExecutorService executorService, MetadataApplier metadataApplier, List<RouteRule> routes) {
        this(operatorName, context, executorService, metadataApplier, routes, SchemaChangeBehavior.LENIENT);
    }

    public SchemaRegistry(String operatorName, OperatorCoordinator.Context context, ExecutorService coordinatorExecutor, MetadataApplier metadataApplier, List<RouteRule> routes, SchemaChangeBehavior schemaChangeBehavior) {
        this.context = context;
        this.coordinatorExecutor = coordinatorExecutor;
        this.operatorName = operatorName;
        this.failedReasons = new HashMap<Integer, Throwable>();
        this.metadataApplier = metadataApplier;
        this.routes = routes;
        this.schemaManager = new SchemaManager(schemaChangeBehavior);
        this.schemaDerivation = new SchemaDerivation(this.schemaManager, routes, new HashMap<TableId, Set<TableId>>());
        this.requestHandler = new SchemaRegistryRequestHandler(metadataApplier, this.schemaManager, this.schemaDerivation, schemaChangeBehavior, context);
        this.schemaChangeBehavior = schemaChangeBehavior;
    }

    public void start() throws Exception {
        LOG.info("Starting SchemaRegistry for {}.", (Object)this.operatorName);
        this.failedReasons.clear();
        this.currentParallelism = this.context.currentParallelism();
        LOG.info("Started SchemaRegistry for {}. Parallelism: {}", (Object)this.operatorName, (Object)this.currentParallelism);
    }

    public void close() throws Exception {
        LOG.info("SchemaRegistry for {} closed.", (Object)this.operatorName);
        this.coordinatorExecutor.shutdown();
        this.requestHandler.close();
    }

    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
        this.runInEventLoop((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            block4: {
                try {
                    if (event instanceof FlushSuccessEvent) {
                        FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent)event;
                        LOG.info("Sink subtask {} succeed flushing for table {}.", (Object)flushSuccessEvent.getSubtask(), (Object)flushSuccessEvent.getTableId().toString());
                        this.requestHandler.flushSuccess(flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask(), this.currentParallelism);
                        break block4;
                    }
                    if (event instanceof SinkWriterRegisterEvent) {
                        this.requestHandler.registerSinkWriter(((SinkWriterRegisterEvent)event).getSubtask());
                        break block4;
                    }
                    throw new FlinkException("Unrecognized Operator Event: " + event);
                }
                catch (Throwable t) {
                    this.context.failJob(t);
                    throw t;
                }
            }
        }), "handling event %s from subTask %d", event, subtask);
    }

    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) {
        this.runInEventLoop((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 DataOutputStream out = new DataOutputStream(baos);){
                int schemaManagerSerializerVersion = SchemaManager.SERIALIZER.getVersion();
                out.writeInt(schemaManagerSerializerVersion);
                byte[] serializedSchemaManager = SchemaManager.SERIALIZER.serialize(this.schemaManager);
                out.writeInt(serializedSchemaManager.length);
                out.write(serializedSchemaManager);
                SchemaDerivation.serializeDerivationMapping(this.schemaDerivation, out);
                resultFuture.complete(baos.toByteArray());
            }
            catch (Throwable t) {
                this.context.failJob(t);
                throw t;
            }
        }), "taking checkpoint %d", checkpointId);
    }

    private void runInEventLoop(ThrowingRunnable<Throwable> action, String actionName, Object ... actionNameFormatParameters) {
        this.coordinatorExecutor.execute(() -> {
            try {
                action.run();
            }
            catch (Throwable t) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
                String actionString = String.format(actionName, actionNameFormatParameters);
                LOG.error("Uncaught exception in the SchemaEvolutionCoordinator for {} while {}. Triggering job failover.", new Object[]{this.operatorName, actionString, t});
                this.context.failJob(t);
            }
        });
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }

    public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) {
        CompletableFuture<CoordinationResponse> responseFuture = new CompletableFuture<CoordinationResponse>();
        this.runInEventLoop((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            block6: {
                try {
                    if (request instanceof SchemaChangeRequest) {
                        SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest)request;
                        this.requestHandler.handleSchemaChangeRequest(schemaChangeRequest, responseFuture);
                        break block6;
                    }
                    if (request instanceof SchemaChangeResultRequest) {
                        this.requestHandler.getSchemaChangeResult(responseFuture);
                        break block6;
                    }
                    if (request instanceof GetEvolvedSchemaRequest) {
                        this.handleGetEvolvedSchemaRequest((GetEvolvedSchemaRequest)request, responseFuture);
                        break block6;
                    }
                    if (request instanceof GetOriginalSchemaRequest) {
                        this.handleGetOriginalSchemaRequest((GetOriginalSchemaRequest)request, responseFuture);
                        break block6;
                    }
                    throw new IllegalArgumentException("Unrecognized CoordinationRequest type: " + request);
                }
                catch (Throwable t) {
                    this.context.failJob(t);
                    throw t;
                }
            }
        }), "handling coordination request %s", request);
        return responseFuture;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
        if (checkpointData == null) {
            return;
        }
        try (ByteArrayInputStream bais = new ByteArrayInputStream(checkpointData);
             DataInputStream in = new DataInputStream(bais);){
            int schemaManagerSerializerVersion = in.readInt();
            switch (schemaManagerSerializerVersion) {
                case 0: {
                    int length = in.readInt();
                    byte[] serializedSchemaManager = new byte[length];
                    in.readFully(serializedSchemaManager);
                    this.schemaManager = SchemaManager.SERIALIZER.deserialize(schemaManagerSerializerVersion, serializedSchemaManager);
                    this.schemaDerivation = new SchemaDerivation(this.schemaManager, this.routes, Collections.emptyMap());
                    this.requestHandler = new SchemaRegistryRequestHandler(this.metadataApplier, this.schemaManager, this.schemaDerivation, this.schemaManager.getBehavior(), this.context);
                    return;
                }
                case 1: 
                case 2: {
                    int length = in.readInt();
                    byte[] serializedSchemaManager = new byte[length];
                    in.readFully(serializedSchemaManager);
                    this.schemaManager = SchemaManager.SERIALIZER.deserialize(schemaManagerSerializerVersion, serializedSchemaManager);
                    Map<TableId, Set<TableId>> derivationMapping = SchemaDerivation.deserializerDerivationMapping(in);
                    this.schemaDerivation = new SchemaDerivation(this.schemaManager, this.routes, derivationMapping);
                    this.requestHandler = new SchemaRegistryRequestHandler(this.metadataApplier, this.schemaManager, this.schemaDerivation, this.schemaChangeBehavior, this.context);
                    return;
                }
                default: {
                    throw new IOException("Unrecognized serialization version " + schemaManagerSerializerVersion);
                }
            }
        }
        catch (Throwable t) {
            this.context.failJob(t);
            throw t;
        }
    }

    public void subtaskReset(int subtask, long checkpointId) {
        Throwable rootCause = this.failedReasons.get(subtask);
        LOG.error(String.format("Subtask %d reset at checkpoint %d.", subtask, checkpointId), rootCause);
    }

    public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable throwable) {
        this.failedReasons.put(subtask, throwable);
    }

    public void executionAttemptReady(int subtask, int attemptNumber, OperatorCoordinator.SubtaskGateway subtaskGateway) {
    }

    private void handleGetEvolvedSchemaRequest(GetEvolvedSchemaRequest getEvolvedSchemaRequest, CompletableFuture<CoordinationResponse> response) {
        LOG.info("Handling evolved schema request: {}", (Object)getEvolvedSchemaRequest);
        int schemaVersion = getEvolvedSchemaRequest.getSchemaVersion();
        TableId tableId = getEvolvedSchemaRequest.getTableId();
        if (schemaVersion == -1) {
            response.complete(CoordinationResponseUtils.wrap(new GetEvolvedSchemaResponse(this.schemaManager.getLatestEvolvedSchema(tableId).orElse(null))));
        } else {
            try {
                response.complete(CoordinationResponseUtils.wrap(new GetEvolvedSchemaResponse(this.schemaManager.getEvolvedSchema(tableId, schemaVersion))));
            }
            catch (IllegalArgumentException iae) {
                LOG.warn("Some client is requesting an non-existed evolved schema for table {} with version {}", (Object)tableId, (Object)schemaVersion);
                response.complete(CoordinationResponseUtils.wrap(new GetEvolvedSchemaResponse(null)));
            }
        }
    }

    private void handleGetOriginalSchemaRequest(GetOriginalSchemaRequest getOriginalSchemaRequest, CompletableFuture<CoordinationResponse> response) {
        LOG.info("Handling original schema request: {}", (Object)getOriginalSchemaRequest);
        int schemaVersion = getOriginalSchemaRequest.getSchemaVersion();
        TableId tableId = getOriginalSchemaRequest.getTableId();
        if (schemaVersion == -1) {
            response.complete(CoordinationResponseUtils.wrap(new GetOriginalSchemaResponse(this.schemaManager.getLatestOriginalSchema(tableId).orElse(null))));
        } else {
            try {
                response.complete(CoordinationResponseUtils.wrap(new GetOriginalSchemaResponse(this.schemaManager.getOriginalSchema(tableId, schemaVersion))));
            }
            catch (IllegalArgumentException iae) {
                LOG.warn("Some client is requesting an non-existed original schema for table {} with version {}", (Object)tableId, (Object)schemaVersion);
                response.complete(CoordinationResponseUtils.wrap(new GetOriginalSchemaResponse(null)));
            }
        }
    }

    @VisibleForTesting
    public void handleApplyOriginalSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent) {
        this.schemaManager.applyOriginalSchemaChange(schemaChangeEvent);
    }

    @VisibleForTesting
    public void handleApplyEvolvedSchemaChangeRequest(SchemaChangeEvent schemaChangeEvent) {
        this.schemaManager.applyEvolvedSchemaChange(schemaChangeEvent);
    }
}

