/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.runtime.operators.schema;

import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.FlushEvent;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
import com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class SchemaOperator
extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class);
    private transient TaskOperatorEventGateway toCoordinator;

    public SchemaOperator() {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Event>> output) {
        super.setup(containingTask, config, output);
        this.toCoordinator = containingTask.getEnvironment().getOperatorCoordinatorEventGateway();
    }

    public void processElement(StreamRecord<Event> streamRecord) {
        Event event = (Event)streamRecord.getValue();
        if (event instanceof SchemaChangeEvent) {
            TableId tableId = ((SchemaChangeEvent)event).tableId();
            LOG.info("Table {} received SchemaChangeEvent and start to be blocked.", (Object)tableId.toString());
            this.handleSchemaChangeEvent(tableId, (SchemaChangeEvent)event);
            return;
        }
        this.output.collect(streamRecord);
    }

    private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) {
        SchemaChangeResponse response = this.requestSchemaChange(tableId, schemaChangeEvent);
        if (response.isShouldSendFlushEvent()) {
            LOG.info("Sending the FlushEvent for table {} in subtask {}.", (Object)tableId, (Object)this.getRuntimeContext().getIndexOfThisSubtask());
            this.output.collect((Object)new StreamRecord((Object)new FlushEvent(tableId)));
            this.output.collect((Object)new StreamRecord((Object)schemaChangeEvent));
            this.requestReleaseUpstream();
        }
    }

    private SchemaChangeResponse requestSchemaChange(TableId tableId, SchemaChangeEvent schemaChangeEvent) {
        return (SchemaChangeResponse)this.sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent));
    }

    private ReleaseUpstreamResponse requestReleaseUpstream() {
        return (ReleaseUpstreamResponse)this.sendRequestToCoordinator(new ReleaseUpstreamRequest());
    }

    private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse> RESPONSE sendRequestToCoordinator(REQUEST request) {
        try {
            CompletableFuture responseFuture = this.toCoordinator.sendRequestToCoordinator(this.getOperatorID(), new SerializedValue(request));
            return (RESPONSE)CoordinationResponseUtils.unwrap((CoordinationResponse)responseFuture.get());
        }
        catch (Exception e) {
            throw new IllegalStateException("Failed to send request to coordinator: " + request.toString(), e);
        }
    }
}

