/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;

import java.io.IOException;
import java.sql.Connection;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractJdbcSinkWriter<ResourceT>
implements SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState>,
SupportMultiTableSinkWriter<ResourceT>,
SupportSchemaEvolutionSinkWriter {
    private static final Logger log = LoggerFactory.getLogger(AbstractJdbcSinkWriter.class);
    protected JdbcDialect dialect;
    protected TablePath sinkTablePath;
    protected TableSchema tableSchema;
    protected TableSchema databaseTableSchema;
    protected transient boolean isOpen;
    protected JdbcConnectionProvider connectionProvider;
    protected JdbcSinkConfig jdbcSinkConfig;
    protected JdbcOutputFormat<SeaTunnelRow, JdbcBatchStatementExecutor<SeaTunnelRow>> outputFormat;
    protected TableSchemaChangeEventDispatcher tableSchemaChanger = new TableSchemaChangeEventDispatcher();

    public void applySchemaChange(SchemaChangeEvent event) throws IOException {
        this.tableSchema = this.tableSchemaChanger.reset(this.tableSchema).apply(event);
        this.reOpenOutputFormat(event);
    }

    protected void reOpenOutputFormat(SchemaChangeEvent event) throws IOException {
        this.prepareCommit();
        JdbcConnectionProvider refreshTableSchemaConnectionProvider = this.dialect.getJdbcConnectionProvider(this.jdbcSinkConfig.getJdbcConnectionConfig());
        try (Connection connection = refreshTableSchemaConnectionProvider.getOrEstablishConnection();){
            this.dialect.applySchemaChange(connection, this.sinkTablePath, event);
        }
        catch (Throwable e) {
            throw new JdbcConnectorException((SeaTunnelErrorCode)JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, e);
        }
        this.outputFormat = new JdbcOutputFormatBuilder(this.dialect, this.connectionProvider, this.jdbcSinkConfig, this.tableSchema, this.databaseTableSchema).build();
        this.outputFormat.open();
    }
}

