/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.databend.sink;

import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.databend.catalog.DatabendCatalog;
import org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions;
import org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.databend.exception.DatabendConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.databend.exception.DatabendConnectorException;
import org.apache.seatunnel.connectors.seatunnel.databend.sink.DatabendSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.databend.util.DatabendUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatabendSink
implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void>,
SupportSaveMode {
    private static final Logger log = LoggerFactory.getLogger(DatabendSink.class);
    private final CatalogTable catalogTable;
    private final SchemaSaveMode schemaSaveMode;
    private final DataSaveMode dataSaveMode;
    private final String database;
    private final String table;
    private final String customSql;
    private final boolean autoCommit;
    private final int batchSize;
    private final int executeTimeoutSec;
    private final DatabendSinkConfig databendSinkConfig;
    private ReadonlyConfig readonlyConfig;

    public DatabendSink(CatalogTable catalogTable, ReadonlyConfig options) {
        this.catalogTable = catalogTable;
        this.databendSinkConfig = DatabendSinkConfig.of(options);
        this.schemaSaveMode = (SchemaSaveMode)options.get(DatabendSinkOptions.SCHEMA_SAVE_MODE);
        this.dataSaveMode = (DataSaveMode)options.get(DatabendSinkOptions.DATA_SAVE_MODE);
        this.customSql = options.getOptional(DatabendSinkOptions.CUSTOM_SQL).orElse(null);
        this.database = options.getOptional(DatabendOptions.DATABASE).orElse(catalogTable.getTableId().getDatabaseName());
        String configuredTable = (String)options.get(DatabendOptions.TABLE);
        if (configuredTable == null || configuredTable.isEmpty()) {
            log.warn("Table name not specified in options, using table name from catalog: {}", (Object)catalogTable.getTableId().getTableName());
            this.table = catalogTable.getTableId().getTableName();
        } else {
            this.table = configuredTable;
        }
        this.autoCommit = (Boolean)options.get(DatabendOptions.AUTO_COMMIT);
        this.batchSize = (Integer)options.get(DatabendOptions.BATCH_SIZE);
        this.executeTimeoutSec = (Integer)options.get(DatabendSinkOptions.EXECUTE_TIMEOUT_SEC);
        this.readonlyConfig = options;
        log.info("DatabendSink initialized with catalog table: {}", (Object)catalogTable);
        log.info("Catalog table ID: {}", (Object)catalogTable.getTableId());
        log.info("Catalog table schema: {}", (Object)catalogTable.getTableSchema());
        log.info("Catalog table row type: {}", (Object)catalogTable.getSeaTunnelRowType());
        if (catalogTable.getSeaTunnelRowType() != null) {
            log.info("Field names: {}", (Object)String.join((CharSequence)", ", catalogTable.getSeaTunnelRowType().getFieldNames()));
            log.info("Field types: {}", (Object)String.join((CharSequence)", ", Arrays.stream(catalogTable.getSeaTunnelRowType().getFieldTypes()).map(type -> type.getSqlType().name()).collect(Collectors.toList())));
        }
        log.info("Target table path: {}.{}", (Object)this.database, (Object)this.table);
        log.info("Schema save mode: {}", (Object)this.schemaSaveMode);
        log.info("Data save mode: {}", (Object)this.dataSaveMode);
        log.info("Custom SQL: {}", (Object)this.customSql);
        log.info("Auto commit: {}", (Object)this.autoCommit);
        log.info("Batch size: {}", (Object)this.batchSize);
        log.info("Execute timeout: {} seconds", (Object)this.executeTimeoutSec);
    }

    public String getPluginName() {
        return "Databend";
    }

    public DatabendSinkWriter createWriter(@NonNull SinkWriter.Context context) throws IOException {
        if (context == null) {
            throw new NullPointerException("context is marked non-null but is null");
        }
        try {
            Connection connection = DatabendUtil.createConnection(this.databendSinkConfig);
            connection.setAutoCommit(this.autoCommit);
            return new DatabendSinkWriter(context, connection, this.catalogTable, this.databendSinkConfig, this.customSql, this.database, this.table, this.batchSize, this.executeTimeoutSec);
        }
        catch (SQLException e) {
            throw new DatabendConnectorException(DatabendConnectorErrorCode.CONNECT_FAILED, "Failed to connect to Databend: " + e.getMessage(), e);
        }
    }

    public Optional<CatalogTable> getWriteCatalogTable() {
        return Optional.of(this.catalogTable);
    }

    public Optional<SaveModeHandler> getSaveModeHandler() {
        try {
            TablePath tablePath = TablePath.of((String)this.database, (String)this.table);
            DatabendCatalog databendCatalog = new DatabendCatalog(this.readonlyConfig, "databend");
            return Optional.of(new DefaultSaveModeHandler(this.schemaSaveMode, this.dataSaveMode, (Catalog)databendCatalog, tablePath, this.catalogTable, this.customSql));
        }
        catch (Exception e) {
            throw new DatabendConnectorException(DatabendConnectorErrorCode.CONNECT_FAILED, "Failed to create SaveModeHandler: " + e.getMessage(), e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private boolean executeSql(Connection connection, String sql) {
        try (Statement statement = connection.createStatement();){
            log.info("Executing SQL: {}", (Object)sql);
            statement.execute(sql);
            boolean bl = true;
            return bl;
        }
        catch (SQLException e) {
            throw new DatabendConnectorException(DatabendConnectorErrorCode.SQL_OPERATION_FAILED, "Failed to execute SQL: " + sql + ", error: " + e.getMessage(), e);
        }
    }

    private String convertToDatabendType(SeaTunnelDataType<?> dataType) {
        switch (dataType.getSqlType()) {
            case STRING: {
                return "STRING";
            }
            case BOOLEAN: {
                return "BOOLEAN";
            }
            case TINYINT: {
                return "TINYINT";
            }
            case SMALLINT: {
                return "SMALLINT";
            }
            case INT: {
                return "INT";
            }
            case BIGINT: {
                return "BIGINT";
            }
            case FLOAT: {
                return "FLOAT";
            }
            case DOUBLE: {
                return "DOUBLE";
            }
            case DECIMAL: {
                return "DECIMAL";
            }
            case BYTES: {
                return "VARCHAR";
            }
            case DATE: {
                return "DATE";
            }
            case TIME: {
                return "TIMESTAMP";
            }
            case TIMESTAMP: {
                return "TIMESTAMP";
            }
        }
        return "STRING";
    }
}

