/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.snowflake.services;

import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.apache.beam.sdk.io.snowflake.data.SnowflakeTableSchema;
import org.apache.beam.sdk.io.snowflake.enums.CreateDisposition;
import org.apache.beam.sdk.io.snowflake.enums.WriteDisposition;
import org.apache.beam.sdk.io.snowflake.services.SnowflakeBatchServiceConfig;
import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

public class SnowflakeBatchServiceImpl
implements SnowflakeServices.BatchService {
    private static final @UnknownKeyFor @NonNull @Initialized String SNOWFLAKE_GCS_PREFIX = "gcs://";
    private static final @UnknownKeyFor @NonNull @Initialized String GCS_PREFIX = "gs://";

    @Override
    public void write(@UnknownKeyFor @NonNull @Initialized SnowflakeBatchServiceConfig config) throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.copyToTable(config);
    }

    @Override
    public @UnknownKeyFor @NonNull @Initialized String read(@UnknownKeyFor @NonNull @Initialized SnowflakeBatchServiceConfig config) throws @UnknownKeyFor @NonNull @Initialized Exception {
        return this.copyIntoStage(config);
    }

    private @UnknownKeyFor @NonNull @Initialized String copyIntoStage(@UnknownKeyFor @NonNull @Initialized SnowflakeBatchServiceConfig config) throws @UnknownKeyFor @NonNull @Initialized SQLException {
        SerializableFunction<Void, DataSource> dataSourceProviderFn = config.getDataSourceProviderFn();
        String database = config.getDatabase();
        String schema = config.getSchema();
        String table = config.getTable();
        String query = config.getQuery();
        String storageIntegrationName = config.getStorageIntegrationName();
        String stagingBucketDir = config.getStagingBucketDir();
        String source = query != null ? String.format("(%s)", query) : this.getTablePath(database, schema, table);
        String copyQuery = String.format("COPY INTO '%s' FROM %s STORAGE_INTEGRATION=%s FILE_FORMAT=(TYPE=CSV COMPRESSION=GZIP FIELD_OPTIONALLY_ENCLOSED_BY='%s' ESCAPE='\\\\');", this.getProperBucketDir(stagingBucketDir), source, storageIntegrationName, this.getASCIICharRepresentation(config.getQuotationMark()));
        SnowflakeBatchServiceImpl.runStatement(copyQuery, this.getConnection(dataSourceProviderFn), null);
        return stagingBucketDir.concat("*");
    }

    private @UnknownKeyFor @NonNull @Initialized String getASCIICharRepresentation(@UnknownKeyFor @NonNull @Initialized String input) {
        return String.format("0x%x", new BigInteger(1, input.getBytes(StandardCharsets.UTF_8)));
    }

    private void copyToTable(@UnknownKeyFor @NonNull @Initialized SnowflakeBatchServiceConfig config) throws @UnknownKeyFor @NonNull @Initialized SQLException {
        SerializableFunction<Void, DataSource> dataSourceProviderFn = config.getDataSourceProviderFn();
        List<String> filesList = config.getFilesList();
        String database = config.getDatabase();
        String schema = config.getSchema();
        String table = config.getTable();
        String query = config.getQuery();
        SnowflakeTableSchema tableSchema = config.getTableSchema();
        CreateDisposition createDisposition = config.getCreateDisposition();
        WriteDisposition writeDisposition = config.getWriteDisposition();
        String storageIntegrationName = config.getStorageIntegrationName();
        String stagingBucketDir = config.getStagingBucketDir();
        String source = query != null ? String.format("(%s)", query) : String.format("'%s'", stagingBucketDir);
        filesList = filesList.stream().map(e -> String.format("'%s'", e)).collect(Collectors.toList());
        String files = String.join((CharSequence)", ", filesList);
        files = files.replaceAll(stagingBucketDir, "");
        DataSource dataSource = (DataSource)dataSourceProviderFn.apply(null);
        this.prepareTableAccordingCreateDisposition(dataSource, table, tableSchema, createDisposition);
        this.prepareTableAccordingWriteDisposition(dataSource, table, writeDisposition);
        query = !storageIntegrationName.isEmpty() ? String.format("COPY INTO %s FROM %s FILES=(%s) FILE_FORMAT=(TYPE=CSV FIELD_OPTIONALLY_ENCLOSED_BY='%s' ESCAPE='\\\\' COMPRESSION=GZIP) STORAGE_INTEGRATION=%s;", this.getTablePath(database, schema, table), this.getProperBucketDir(source), files, this.getASCIICharRepresentation(config.getQuotationMark()), storageIntegrationName) : String.format("COPY INTO %s FROM %s FILES=(%s) FILE_FORMAT=(TYPE=CSV FIELD_OPTIONALLY_ENCLOSED_BY='%s' ESCAPE='\\\\' COMPRESSION=GZIP);", table, source, files, this.getASCIICharRepresentation(config.getQuotationMark()));
        SnowflakeBatchServiceImpl.runStatement(query, dataSource.getConnection(), null);
    }

    private void truncateTable(@UnknownKeyFor @NonNull @Initialized DataSource dataSource, @UnknownKeyFor @NonNull @Initialized String tablePath) throws @UnknownKeyFor @NonNull @Initialized SQLException {
        String query = String.format("TRUNCATE %s;", tablePath);
        SnowflakeBatchServiceImpl.runConnectionWithStatement(dataSource, query, null);
    }

    private static void checkIfTableIsEmpty(@UnknownKeyFor @NonNull @Initialized DataSource dataSource, @UnknownKeyFor @NonNull @Initialized String tablePath) throws @UnknownKeyFor @NonNull @Initialized SQLException {
        String selectQuery = String.format("SELECT count(*) FROM %s LIMIT 1;", tablePath);
        SnowflakeBatchServiceImpl.runConnectionWithStatement(dataSource, selectQuery, resultSet -> {
            assert (resultSet != null);
            SnowflakeBatchServiceImpl.checkIfTableIsEmpty(resultSet);
        });
    }

    private static void checkIfTableIsEmpty(@UnknownKeyFor @NonNull @Initialized ResultSet resultSet) {
        int columnId = 1;
        try {
            if (!resultSet.next() || !SnowflakeBatchServiceImpl.checkIfTableIsEmpty(resultSet, columnId)) {
                throw new RuntimeException("Table is not empty. Aborting COPY with disposition EMPTY");
            }
        }
        catch (SQLException e) {
            throw new RuntimeException("Unable run pipeline with EMPTY disposition.", e);
        }
    }

    private static @UnknownKeyFor @NonNull @Initialized boolean checkIfTableIsEmpty(@UnknownKeyFor @NonNull @Initialized ResultSet resultSet, @UnknownKeyFor @NonNull @Initialized int columnId) throws @UnknownKeyFor @NonNull @Initialized SQLException {
        int rowCount = resultSet.getInt(columnId);
        return rowCount < 1;
    }

    private void prepareTableAccordingCreateDisposition(@UnknownKeyFor @NonNull @Initialized DataSource dataSource, @UnknownKeyFor @NonNull @Initialized String table, @UnknownKeyFor @NonNull @Initialized SnowflakeTableSchema tableSchema, @UnknownKeyFor @NonNull @Initialized CreateDisposition createDisposition) throws @UnknownKeyFor @NonNull @Initialized SQLException {
        switch (createDisposition) {
            case CREATE_NEVER: {
                break;
            }
            case CREATE_IF_NEEDED: {
                this.createTableIfNotExists(dataSource, table, tableSchema);
            }
        }
    }

    private void prepareTableAccordingWriteDisposition(@UnknownKeyFor @NonNull @Initialized DataSource dataSource, @UnknownKeyFor @NonNull @Initialized String table, @UnknownKeyFor @NonNull @Initialized WriteDisposition writeDisposition) throws @UnknownKeyFor @NonNull @Initialized SQLException {
        switch (writeDisposition) {
            case TRUNCATE: {
                this.truncateTable(dataSource, table);
                break;
            }
            case EMPTY: {
                SnowflakeBatchServiceImpl.checkIfTableIsEmpty(dataSource, table);
                break;
            }
        }
    }

    private void createTableIfNotExists(@UnknownKeyFor @NonNull @Initialized DataSource dataSource, @UnknownKeyFor @NonNull @Initialized String table, @UnknownKeyFor @NonNull @Initialized SnowflakeTableSchema tableSchema) throws @UnknownKeyFor @NonNull @Initialized SQLException {
        String query = String.format("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '%s');", table.toUpperCase());
        SnowflakeBatchServiceImpl.runConnectionWithStatement(dataSource, query, resultSet -> {
            assert (resultSet != null);
            if (!SnowflakeBatchServiceImpl.checkResultIfTableExists(resultSet)) {
                try {
                    this.createTable(dataSource, table, tableSchema);
                }
                catch (SQLException e) {
                    throw new RuntimeException("Unable to create table.", e);
                }
            }
        });
    }

    private static @UnknownKeyFor @NonNull @Initialized boolean checkResultIfTableExists(@UnknownKeyFor @NonNull @Initialized ResultSet resultSet) {
        try {
            if (resultSet.next()) {
                return SnowflakeBatchServiceImpl.checkIfResultIsTrue(resultSet);
            }
            throw new RuntimeException("Unable run pipeline with CREATE IF NEEDED - no response.");
        }
        catch (SQLException e) {
            throw new RuntimeException("Unable run pipeline with CREATE IF NEEDED disposition.", e);
        }
    }

    private void createTable(@UnknownKeyFor @NonNull @Initialized DataSource dataSource, @UnknownKeyFor @NonNull @Initialized String table, @UnknownKeyFor @NonNull @Initialized SnowflakeTableSchema tableSchema) throws @UnknownKeyFor @NonNull @Initialized SQLException {
        Preconditions.checkArgument((tableSchema != null ? 1 : 0) != 0, (Object)"The CREATE_IF_NEEDED disposition requires schema if table doesn't exists");
        String query = String.format("CREATE TABLE %s (%s);", table, tableSchema.sql());
        SnowflakeBatchServiceImpl.runConnectionWithStatement(dataSource, query, null);
    }

    private static @UnknownKeyFor @NonNull @Initialized boolean checkIfResultIsTrue(@UnknownKeyFor @NonNull @Initialized ResultSet resultSet) throws @UnknownKeyFor @NonNull @Initialized SQLException {
        int columnId = 1;
        return resultSet.getBoolean(columnId);
    }

    private static void runConnectionWithStatement(@UnknownKeyFor @NonNull @Initialized DataSource dataSource, @UnknownKeyFor @NonNull @Initialized String query, @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized ResultSet> resultSetMethod) throws @UnknownKeyFor @NonNull @Initialized SQLException {
        Connection connection = dataSource.getConnection();
        SnowflakeBatchServiceImpl.runStatement(query, connection, resultSetMethod);
        connection.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void runStatement(@UnknownKeyFor @NonNull @Initialized String query, @UnknownKeyFor @NonNull @Initialized Connection connection, @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized ResultSet> resultSetMethod) throws @UnknownKeyFor @NonNull @Initialized SQLException {
        PreparedStatement statement = connection.prepareStatement(query);
        try {
            if (resultSetMethod != null) {
                ResultSet resultSet = statement.executeQuery();
                resultSetMethod.accept(resultSet);
            } else {
                statement.execute();
            }
        }
        finally {
            statement.close();
            connection.close();
        }
    }

    private @UnknownKeyFor @NonNull @Initialized Connection getConnection(@UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @Nullable @Initialized Void, @UnknownKeyFor @NonNull @Initialized DataSource> dataSourceProviderFn) throws @UnknownKeyFor @NonNull @Initialized SQLException {
        DataSource dataSource = (DataSource)dataSourceProviderFn.apply(null);
        return dataSource.getConnection();
    }

    private @UnknownKeyFor @NonNull @Initialized String getProperBucketDir(@UnknownKeyFor @NonNull @Initialized String bucketDir) {
        if (bucketDir.contains(GCS_PREFIX)) {
            return bucketDir.replace(GCS_PREFIX, SNOWFLAKE_GCS_PREFIX);
        }
        return bucketDir;
    }

    private @UnknownKeyFor @NonNull @Initialized String getTablePath(@UnknownKeyFor @NonNull @Initialized String database, @UnknownKeyFor @NonNull @Initialized String schema, @UnknownKeyFor @NonNull @Initialized String table) {
        return String.format("%s.%s.%s", database, schema, table);
    }
}

