/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.sink;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.operators.sink.exception.SinkWrapperException;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

@Internal
public class DataSinkWriterOperator<CommT>
extends AbstractStreamOperator<CommittableMessage<CommT>>
implements OneInputStreamOperator<Event, CommittableMessage<CommT>>,
BoundedOneInput {
    private SchemaEvolutionClient schemaEvolutionClient;
    private final OperatorID schemaOperatorID;
    private final Sink<Event> sink;
    private final ProcessingTimeService processingTimeService;
    private final MailboxExecutor mailboxExecutor;
    private Object flinkWriterOperator;
    private SinkWriter<Event> copySinkWriter;
    private final Set<TableId> processedTableIds;

    public DataSinkWriterOperator(Sink<Event> sink, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor, OperatorID schemaOperatorID) {
        this.sink = sink;
        this.processingTimeService = processingTimeService;
        this.mailboxExecutor = mailboxExecutor;
        this.schemaOperatorID = schemaOperatorID;
        this.processedTableIds = new HashSet<TableId>();
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<CommittableMessage<CommT>>> output) {
        super.setup(containingTask, config, output);
        this.flinkWriterOperator = this.createFlinkWriterOperator();
        ((AbstractStreamOperator)this.getFlinkWriterOperator()).setup(containingTask, config, output);
        this.schemaEvolutionClient = new SchemaEvolutionClient(containingTask.getEnvironment().getOperatorCoordinatorEventGateway(), this.schemaOperatorID);
    }

    public void open() throws Exception {
        ((AbstractStreamOperator)this.getFlinkWriterOperator()).open();
        this.copySinkWriter = (SinkWriter)this.getFieldValue("sinkWriter");
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        this.schemaEvolutionClient.registerSubtask(this.getRuntimeContext().getIndexOfThisSubtask());
        ((AbstractStreamOperator)this.getFlinkWriterOperator()).initializeState(context);
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        ((AbstractStreamOperator)this.getFlinkWriterOperator()).snapshotState(context);
    }

    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        ((AbstractStreamOperator)this.getFlinkWriterOperator()).processWatermark(mark);
    }

    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        super.processWatermarkStatus(watermarkStatus);
        ((AbstractStreamOperator)this.getFlinkWriterOperator()).processWatermarkStatus(watermarkStatus);
    }

    public void processElement(StreamRecord<Event> element) throws Exception {
        Event event = (Event)element.getValue();
        try {
            if (event instanceof FlushEvent) {
                this.handleFlushEvent((FlushEvent)event);
                return;
            }
            if (event instanceof CreateTableEvent) {
                this.processedTableIds.add(((CreateTableEvent)event).tableId());
                ((OneInputStreamOperator)this.getFlinkWriterOperator()).processElement(element);
                return;
            }
            ChangeEvent changeEvent = (ChangeEvent)event;
            if (!this.processedTableIds.contains(changeEvent.tableId())) {
                this.emitLatestSchema(changeEvent.tableId());
                this.processedTableIds.add(changeEvent.tableId());
            }
            this.processedTableIds.add(changeEvent.tableId());
            ((OneInputStreamOperator)this.getFlinkWriterOperator()).processElement(element);
        }
        catch (Exception e) {
            throw new SinkWrapperException(event, (Throwable)e);
        }
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        ((AbstractStreamOperator)this.getFlinkWriterOperator()).prepareSnapshotPreBarrier(checkpointId);
    }

    public void close() throws Exception {
        ((OneInputStreamOperator)this.getFlinkWriterOperator()).close();
    }

    public void endInput() throws Exception {
        ((BoundedOneInput)this.getFlinkWriterOperator()).endInput();
    }

    private void handleFlushEvent(FlushEvent event) throws Exception {
        this.copySinkWriter.flush(false);
        if (event.getSchemaChangeEventType() != SchemaChangeEventType.CREATE_TABLE && event.getSchemaChangeEventType() != SchemaChangeEventType.DROP_TABLE) {
            event.getTableIds().stream().filter(tableId -> !this.processedTableIds.contains(tableId)).forEach(tableId -> {
                LOG.info("Table {} has not been processed", tableId);
                try {
                    this.emitLatestSchema((TableId)tableId);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
                this.processedTableIds.add((TableId)tableId);
            });
        }
        this.schemaEvolutionClient.notifyFlushSuccess(this.getRuntimeContext().getIndexOfThisSubtask(), event.getSourceSubTaskId());
    }

    private void emitLatestSchema(TableId tableId) throws Exception {
        Optional<Schema> schema = this.schemaEvolutionClient.getLatestEvolvedSchema(tableId);
        if (!schema.isPresent()) {
            throw new RuntimeException("Could not find schema message from SchemaRegistry for " + tableId);
        }
        ((OneInputStreamOperator)this.getFlinkWriterOperator()).processElement(new StreamRecord((Object)new CreateTableEvent(tableId, schema.get())));
        this.processedTableIds.add(tableId);
    }

    private Object createFlinkWriterOperator() {
        try {
            Class<?> flinkWriterClass = this.getRuntimeContext().getUserCodeClassLoader().loadClass("org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator");
            Constructor<?> constructor = flinkWriterClass.getDeclaredConstructor(Sink.class, ProcessingTimeService.class, MailboxExecutor.class);
            constructor.setAccessible(true);
            return constructor.newInstance(this.sink, this.processingTimeService, this.mailboxExecutor);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create SinkWriterOperator in Flink", e);
        }
    }

    private <T> T getFieldValue(String fieldName) throws IllegalAccessException {
        for (Class<?> clazz = this.flinkWriterOperator.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
            try {
                Field field = clazz.getDeclaredField(fieldName);
                field.setAccessible(true);
                return (T)field.get(this.flinkWriterOperator);
            }
            catch (NoSuchFieldException e) {
                continue;
            }
        }
        throw new RuntimeException("failed to get sinkWriter");
    }

    private <T> T getFlinkWriterOperator() {
        return (T)this.flinkWriterOperator;
    }
}

