/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.client.loader;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import net.snowflake.client.jdbc.SnowflakeType;
import net.snowflake.client.jdbc.SnowflakeUtil;
import net.snowflake.client.loader.BufferStage;
import net.snowflake.client.loader.LoadResultListener;
import net.snowflake.client.loader.Loader;
import net.snowflake.client.loader.LoaderProperty;
import net.snowflake.client.loader.LoadingError;
import net.snowflake.client.loader.OnError;
import net.snowflake.client.loader.Operation;
import net.snowflake.client.loader.ProcessQueue;
import net.snowflake.client.loader.PutQueue;
import net.snowflake.client.loader.Utils;
import net.snowflake.client.log.SFLogger;
import net.snowflake.client.log.SFLoggerFactory;

public class StreamLoader
implements Loader,
Runnable {
    private static final SFLogger LOGGER = SFLoggerFactory.getLogger(StreamLoader.class);
    private static final String SYSTEM_PARAMETER_PREFIX = "net.snowflake.client.loader.";
    static final String FILE_PREFIX = "stream_";
    static final String FILE_SUFFIX = ".gz";
    private static final long DEFAULT_BATCH_ROW_SIZE = -1L;
    public static DatabaseMetaData metadata;
    private BufferStage _stage = null;
    private Operation _op = null;
    private boolean _startTransaction = false;
    private boolean _is_first_start_call = true;
    private boolean _is_last_finish_call = true;
    private boolean _oneBatch = false;
    private boolean _truncate = false;
    private String _before = null;
    private String _after = null;
    private ArrayBlockingQueue<byte[]> _queueData;
    private Thread _thread;
    private ArrayBlockingQueue<BufferStage> _queuePut;
    private PutQueue _put;
    private ArrayBlockingQueue<BufferStage> _queueProcess;
    private ProcessQueue _process;
    private String _remoteStage = "~";
    private String _table;
    private String _schema;
    private String _database;
    private List<String> _columns;
    private List<String> _keys;
    private long _batchRowSize = -1L;
    private long _csvFileBucketSize = 64L;
    private long _csvFileSize = 0x3200000L;
    boolean _testRemoteBadCSV = false;
    boolean _preserveStageFile = false;
    private boolean _useLocalTimezone = false;
    private boolean _mapTimeToTimestamp = false;
    boolean _compressDataBeforePut = true;
    boolean _compressFileByPut = false;
    long _compressLevel = 1L;
    String _onError = "CONTINUE";
    boolean _copyEmptyFieldAsEmpty = false;
    boolean _testMode = false;
    private final Connection _putConn;
    private final Connection _processConn;
    private final String _noise;
    private AtomicBoolean _active = new AtomicBoolean(false);
    private AtomicBoolean _aborted = new AtomicBoolean(false);
    private RuntimeException _abortCause = new Loader.ConnectionError("Unknown exception");
    private AtomicInteger _throttleCounter = new AtomicInteger(0);
    private final GregorianCalendar _calendarUTC = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
    private GregorianCalendar _calendarLocal;
    private DateFormat _dateFormat;
    private DateFormat _timeFormat;
    private DateFormat _timestampFormat;
    private DateFormat _timestampTzFormat;
    private LoadResultListener _listener = new LoadResultListener(){
        private final AtomicInteger errorCount = new AtomicInteger(0);
        private final AtomicInteger errorRecordCount = new AtomicInteger(0);
        private final AtomicInteger submittedRowCount = new AtomicInteger(0);

        @Override
        public boolean needErrors() {
            return false;
        }

        @Override
        public boolean needSuccessRecords() {
            return false;
        }

        @Override
        public void addError(LoadingError error) {
        }

        @Override
        public boolean throwOnError() {
            return false;
        }

        @Override
        public void recordProvided(Operation op, Object[] record) {
        }

        @Override
        public void addProcessedRecordCount(Operation op, int i) {
        }

        @Override
        public void addOperationRecordCount(Operation op, int i) {
        }

        @Override
        public int getErrorCount() {
            return this.errorCount.get();
        }

        @Override
        public int getErrorRecordCount() {
            return this.errorRecordCount.get();
        }

        @Override
        public void resetErrorCount() {
            this.errorCount.set(0);
        }

        @Override
        public void resetErrorRecordCount() {
            this.errorRecordCount.set(0);
        }

        @Override
        public void addErrorCount(int count) {
            this.errorCount.addAndGet(count);
        }

        @Override
        public void addErrorRecordCount(int count) {
            this.errorRecordCount.addAndGet(count);
        }

        @Override
        public void resetSubmittedRowCount() {
            this.submittedRowCount.set(0);
        }

        @Override
        public void addSubmittedRowCount(int count) {
            this.submittedRowCount.addAndGet(count);
        }

        @Override
        public int getSubmittedRowCount() {
            return this.submittedRowCount.get();
        }
    };

    private void resetCalendar() {
        this._calendarUTC.clear();
        this._calendarLocal = new GregorianCalendar(TimeZone.getDefault());
        this._calendarLocal.clear();
    }

    public StreamLoader(Map<LoaderProperty, Object> properties, Connection putConnection, Connection processConnection) {
        this._putConn = putConnection;
        this._processConn = processConnection;
        for (Map.Entry<LoaderProperty, Object> e : properties.entrySet()) {
            this.setProperty(e.getKey(), e.getValue());
        }
        this._noise = SnowflakeUtil.randomAlphaNumeric(6);
    }

    @Override
    public void setProperty(LoaderProperty property, Object value) {
        switch (property) {
            case tableName: {
                this._table = (String)value;
                break;
            }
            case schemaName: {
                this._schema = (String)value;
                break;
            }
            case databaseName: {
                this._database = (String)value;
                break;
            }
            case remoteStage: {
                this._remoteStage = (String)value;
                break;
            }
            case columns: {
                this._columns = (List)value;
                break;
            }
            case keys: {
                this._keys = (List)value;
                break;
            }
            case operation: {
                this._op = (Operation)((Object)value);
                break;
            }
            case startTransaction: {
                this._startTransaction = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case oneBatch: {
                this._oneBatch = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case truncateTable: {
                this._truncate = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case executeBefore: {
                this._before = String.valueOf(value);
                break;
            }
            case executeAfter: {
                this._after = String.valueOf(value);
                break;
            }
            case isFirstStartCall: {
                this._is_first_start_call = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case isLastFinishCall: {
                this._is_last_finish_call = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case batchRowSize: {
                this._batchRowSize = this.parseLongValue(LoaderProperty.batchRowSize, value);
                break;
            }
            case csvFileBucketSize: {
                this._csvFileBucketSize = this.parseLongValue(LoaderProperty.csvFileBucketSize, value);
                break;
            }
            case csvFileSize: {
                this._csvFileSize = this.parseLongValue(LoaderProperty.csvFileSize, value);
                break;
            }
            case preserveStageFile: {
                this._preserveStageFile = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case useLocalTimezone: {
                this._useLocalTimezone = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case copyEmptyFieldAsEmpty: {
                this._copyEmptyFieldAsEmpty = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case mapTimeToTimestamp: {
                this._mapTimeToTimestamp = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case compressDataBeforePut: {
                this._compressDataBeforePut = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case compressFileByPut: {
                this._compressFileByPut = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case compressLevel: {
                this._compressLevel = this.parseLongValue(LoaderProperty.compressLevel, value);
                if ((this._compressLevel < 1L || this._compressLevel > 9L) && this._compressLevel != -1L) {
                    throw new IllegalArgumentException("invalid compression level");
                }
            }
            case onError: {
                String v = String.valueOf(value);
                this._onError = OnError.validate(v) ? v : "CONTINUE";
                break;
            }
            case testRemoteBadCSV: {
                this._testRemoteBadCSV = Boolean.valueOf(String.valueOf(value));
                break;
            }
        }
    }

    private long parseLongValue(LoaderProperty name, Object value) {
        long ret;
        if (value instanceof String) {
            ret = new Long((String)value);
        } else if (value instanceof Long) {
            ret = (Long)value;
        } else if (value instanceof Integer) {
            ret = ((Integer)value).intValue();
        } else {
            throw new IllegalArgumentException(String.format("'%s' Must be a LONG value", name.toString()));
        }
        return ret;
    }

    private void setPropertyBySystemProperty() {
        String BATCH_ROW_SIZE_KEY = "net.snowflake.client.loader.batchRowSize";
        String CSV_FILE_BUCKET_SIZE = "net.snowflake.client.loader.csvFileBucketSize";
        String CSV_FILE_SIZE = "net.snowflake.client.loader.csvFileSize";
        String COMPRESS_DATA_BEFORE_PUT_KEY = "net.snowflake.client.loader.compressDataBeforePut";
        String COMPRESS_FILE_BY_PUT_KEY = "net.snowflake.client.loader.compressFileByPut";
        String COMPRESS_LEVEL = "net.snowflake.client.loader.compressLevel";
        Properties props = System.getProperties();
        for (String propKey : props.stringPropertyNames()) {
            String value = props.getProperty(propKey);
            if ("net.snowflake.client.loader.batchRowSize".equals(propKey)) {
                this._batchRowSize = this.parseLongValue(LoaderProperty.batchRowSize, value);
                continue;
            }
            if ("net.snowflake.client.loader.csvFileBucketSize".equals(propKey)) {
                this._csvFileBucketSize = this.parseLongValue(LoaderProperty.csvFileBucketSize, value);
                continue;
            }
            if ("net.snowflake.client.loader.csvFileSize".equals(propKey)) {
                this._csvFileSize = this.parseLongValue(LoaderProperty.csvFileSize, value);
                continue;
            }
            if ("net.snowflake.client.loader.compressDataBeforePut".equals(propKey)) {
                this._compressDataBeforePut = Boolean.valueOf(value);
                continue;
            }
            if ("net.snowflake.client.loader.compressFileByPut".equals(propKey)) {
                this._compressFileByPut = Boolean.valueOf(value);
                continue;
            }
            if (!"net.snowflake.client.loader.compressLevel".equals(propKey)) continue;
            this._compressLevel = Long.valueOf(value);
        }
    }

    private void initDateFormats() {
        this.resetCalendar();
        this._dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
        this._timeFormat = this._mapTimeToTimestamp ? this._dateFormat : new SimpleDateFormat("HH:mm:ss.SSS");
        this._timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.");
        this._timestampTzFormat = new SimpleDateFormat("XXX");
        GregorianCalendar cal = !this._useLocalTimezone ? this._calendarUTC : this._calendarLocal;
        this._dateFormat.setCalendar(cal);
        this._timeFormat.setCalendar(cal);
        this._timestampFormat.setCalendar(cal);
        this._timestampTzFormat.setCalendar(cal);
    }

    @Override
    public void start() {
        LOGGER.debug("Start Loading");
        this.validateParameters();
        if (this._op == null) {
            this.abort(new Loader.ConnectionError("Loader started with no operation"));
            return;
        }
        this.initDateFormats();
        this.initQueues();
        if (this._is_first_start_call) {
            try {
                if (this._startTransaction) {
                    LOGGER.debug("Begin Transaction");
                    this._processConn.createStatement().execute("begin transaction");
                } else {
                    LOGGER.debug("No Transaction started");
                }
            }
            catch (SQLException ex) {
                this.abort(new Loader.ConnectionError("Failed to start Transaction", Utils.getCause(ex)));
            }
            if (this._truncate) {
                this.truncateTargetTable();
            }
            try {
                if (this._before != null) {
                    LOGGER.debug("Running Execute Before SQL");
                    this._processConn.createStatement().execute(this._before);
                }
            }
            catch (SQLException ex) {
                this.abort(new Loader.ConnectionError(String.format("Execute Before SQL failed to run: %s", this._before), Utils.getCause(ex)));
            }
        }
    }

    private void validateParameters() {
        LOGGER.debug("Validate Parameters");
        if (Operation.INSERT != this._op && (this._keys == null || this._keys.isEmpty())) {
            throw new Loader.ConnectionError("Updating operations require keys");
        }
        this.setPropertyBySystemProperty();
        LOGGER.debug("Database Name: {}, Schema Name: {}, Table Name: {}, Remote Stage: {}, Columns: {}, Keys: {}, Operation: {}, Start Transaction: {}, OneBatch: {}, Truncate Table: {}, Execute Before: {}, Execute After: {}, Batch Row Size: {}, CSV File Bucket Size: {}, CSV File Size: {}, Preserve Stage File: {}, Use Local TimeZone: {}, Copy Empty Field As Empty: {}, MapTimeToTimestamp: {}, Compress Data before PUT: {}, Compress File By Put: {}, Compress Level: {}, OnError: {}", new Object[]{this._database, this._schema, this._table, this._remoteStage, this._columns, this._keys, this._op, this._startTransaction, this._oneBatch, this._truncate, this._before, this._after, this._batchRowSize, this._csvFileBucketSize, this._csvFileSize, this._preserveStageFile, this._useLocalTimezone, this._copyEmptyFieldAsEmpty, this._mapTimeToTimestamp, this._compressDataBeforePut, this._compressFileByPut, this._compressLevel, this._onError});
    }

    String getNoise() {
        return this._noise;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void abort(RuntimeException t) {
        StreamLoader streamLoader = this;
        synchronized (streamLoader) {
            LOGGER.debug("Exception received. Aborting...", t);
            if (this._aborted.getAndSet(true)) {
                return;
            }
            if (t != null) {
                this._abortCause = t;
            }
            this.rollback();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean isAborted() {
        StreamLoader streamLoader = this;
        synchronized (streamLoader) {
            return this._aborted.get();
        }
    }

    @Override
    public void rollback() {
        LOGGER.debug("Rollback");
        try {
            this.terminate();
            LOGGER.debug("Rollback");
            this._processConn.createStatement().execute("rollback");
        }
        catch (SQLException ex) {
            LOGGER.error(ex.getMessage(), ex);
        }
    }

    @Override
    public void submitRow(Object[] row) {
        try {
            if (this._aborted.get()) {
                if (this._listener.throwOnError()) {
                    throw this._abortCause;
                }
                return;
            }
        }
        catch (Exception ex) {
            this.abort(new Loader.ConnectionError("Throwing Error", Utils.getCause(ex)));
        }
        byte[] data = null;
        try {
            if (!this._active.get()) {
                LOGGER.debug("Inactive loader. Row ignored");
                return;
            }
            data = this.createCSVRecord(row);
        }
        catch (Exception ex) {
            this.abort(new Loader.ConnectionError("Creating data set for CSV", Utils.getCause(ex)));
        }
        try {
            this.writeBytes(data);
            this._listener.addSubmittedRowCount(1);
            if (this._listener.needSuccessRecords()) {
                this._listener.recordProvided(this._op, row);
            }
        }
        catch (Exception ex) {
            this.abort(new Loader.ConnectionError("Writing Bytes to CSV files", Utils.getCause(ex)));
        }
        if (this._batchRowSize > 0L && this._listener.getSubmittedRowCount() > 0 && (long)this._listener.getSubmittedRowCount() % this._batchRowSize == 0L) {
            LOGGER.debug("Flushing Queue: Submitted Row Count: {}, Batch Row Size: {}", this._listener.getSubmittedRowCount(), this._batchRowSize);
            try {
                this.flushQueues();
            }
            catch (Exception ex) {
                this.abort(new Loader.ConnectionError("Flush Queues", Utils.getCause(ex)));
            }
            try {
                this.initQueues();
            }
            catch (Exception ex) {
                this.abort(new Loader.ConnectionError("Init Queues", Utils.getCause(ex)));
            }
        }
    }

    private void initQueues() {
        LOGGER.debug("Init Queues");
        if (this._active.getAndSet(true)) {
            return;
        }
        this._queuePut = new ArrayBlockingQueue(48);
        this._queueProcess = new ArrayBlockingQueue(48);
        this._put = new PutQueue(this);
        this._process = new ProcessQueue(this);
        this._queueData = new ArrayBlockingQueue(1024);
        this._thread = new Thread(this);
        this._thread.setName("StreamLoaderThread");
        this._thread.start();
        this._stage = new BufferStage(this, this._op, this._csvFileBucketSize, this._csvFileSize);
    }

    private void flushQueues() {
        LOGGER.debug("Flush Queues");
        try {
            this._queueData.put(new byte[0]);
            this._thread.join(10000L);
            if (this._thread.isAlive()) {
                this._thread.interrupt();
            }
        }
        catch (Exception ex) {
            String msg = "Failed to join StreamLoader queue: " + ex.getMessage();
            LOGGER.error(msg, ex);
            throw new Loader.DataError(msg, Utils.getCause(ex));
        }
        this.terminate();
        this._put.join();
        this._process.join();
        if (this._aborted.get()) {
            throw this._abortCause;
        }
    }

    private void writeBytes(byte[] data) throws IOException, InterruptedException {
        if (this._aborted.get()) {
            return;
        }
        boolean full = this._stage.stageData(data);
        if (full && !this._oneBatch) {
            this.queuePut(this._stage);
            this._stage = new BufferStage(this, this._op, this._csvFileBucketSize, this._csvFileSize);
        }
    }

    private void truncateTargetTable() {
        try {
            this._processConn.createStatement().execute("DELETE FROM " + this.getFullTableName());
        }
        catch (SQLException ex) {
            LOGGER.error(ex.getMessage(), ex);
            this.abort(new Loader.ConnectionError(Utils.getCause(ex)));
        }
    }

    @Override
    public void run() {
        try {
            byte[] data;
            while ((data = this._queueData.take()).length != 0) {
                this.writeBytes(data);
            }
        }
        catch (Exception ex) {
            LOGGER.error(ex.getMessage(), ex);
            this.abort(new Loader.ConnectionError(Utils.getCause(ex)));
        }
    }

    private byte[] createCSVRecord(Object[] data) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < data.length; ++i) {
            if (i > 0) {
                sb.append(',');
            }
            sb.append(SnowflakeType.escapeForCSV(SnowflakeType.lexicalValue(data[i], this._dateFormat, this._timeFormat, this._timestampFormat, this._timestampTzFormat)));
        }
        return sb.toString().getBytes(StandardCharsets.UTF_8);
    }

    @Override
    public void finish() throws Exception {
        LOGGER.debug("Finish Loading");
        this.flushQueues();
        if (this._is_last_finish_call) {
            try {
                if (this._after != null) {
                    LOGGER.debug("Running Execute After SQL");
                    this._processConn.createStatement().execute(this._after);
                }
                this._processConn.createStatement().execute("commit");
                LOGGER.debug("Committed");
            }
            catch (SQLException ex) {
                try {
                    this._processConn.createStatement().execute("rollback");
                }
                catch (SQLException ex0) {
                    LOGGER.debug("Failed to rollback");
                }
                LOGGER.debug(String.format("Execute After SQL failed to run: %s", this._after), ex);
                throw new Loader.ConnectionError(Utils.getCause(ex));
            }
        }
    }

    @Override
    public void close() {
        LOGGER.debug("Close Loader");
        try {
            this._processConn.close();
            this._putConn.close();
        }
        catch (SQLException ex) {
            LOGGER.error(ex.getMessage(), ex);
            throw new Loader.ConnectionError(Utils.getCause(ex));
        }
    }

    private void terminate() {
        LOGGER.debug("Terminate Loader");
        boolean active = this._active.getAndSet(false);
        if (!active) {
            return;
        }
        if (this._stage == null) {
            this._stage = new BufferStage(this, Operation.INSERT, this._csvFileBucketSize, this._csvFileSize);
        }
        this._stage.setTerminate(true);
        try {
            this.queuePut(this._stage);
        }
        catch (InterruptedException ex) {
            LOGGER.error("Unknown Error", ex);
        }
        LOGGER.debug("Snowflake loader terminating");
    }

    @Override
    public void resetOperation(Operation op) {
        LOGGER.debug("Reset Loader");
        if (op.equals((Object)this._op)) {
            return;
        }
        LOGGER.debug("Operation is changing from {} to {}", new Object[]{this._op, op});
        this._op = op;
        if (this._stage != null) {
            try {
                this.queuePut(this._stage);
            }
            catch (InterruptedException ex) {
                LOGGER.error(this._stage.getId(), ex);
            }
        }
        this._stage = new BufferStage(this, this._op, this._csvFileBucketSize, this._csvFileSize);
    }

    String getTable() {
        return this._table;
    }

    String getBase() {
        return BASE;
    }

    Connection getPutConnection() {
        return this._putConn;
    }

    Connection getProcessConnection() {
        return this._processConn;
    }

    String getRemoteStage() {
        return this._remoteStage;
    }

    List<String> getKeys() {
        return this._keys;
    }

    List<String> getColumns() {
        return this._columns;
    }

    String getColumnsAsString() {
        StringBuilder sb = new StringBuilder("\"");
        for (int i = 0; i < this._columns.size(); ++i) {
            if (i > 0) {
                sb.append("\",\"");
            }
            sb.append(this._columns.get(i));
        }
        sb.append("\"");
        return sb.toString();
    }

    String getFullTableName() {
        return (this._database == null ? "" : "\"" + this._database + "\".") + (this._schema == null ? "" : "\"" + this._schema + "\".") + "\"" + this._table + "\"";
    }

    @Override
    public LoadResultListener getListener() {
        return this._listener;
    }

    @Override
    public void setListener(LoadResultListener _listener) {
        this._listener = _listener;
    }

    private void queuePut(BufferStage stage) throws InterruptedException {
        this._queuePut.put(stage);
    }

    BufferStage takePut() throws InterruptedException {
        return this._queuePut.take();
    }

    void queueProcess(BufferStage stage) throws InterruptedException {
        this._queueProcess.put(stage);
    }

    BufferStage takeProcess() throws InterruptedException {
        return this._queueProcess.take();
    }

    void throttleUp() {
        int open = this._throttleCounter.incrementAndGet();
        LOGGER.debug("PUT Throttle Up: {}", open);
        if (open > 8) {
            LOGGER.debug("Will retry scheduling file for upload after {} seconds", Math.pow(2.0, open - 7));
            try {
                Thread.sleep(1000 * (int)Math.pow(2.0, open - 7));
            }
            catch (InterruptedException ex) {
                LOGGER.error("Exception occurs while waiting", ex);
            }
        }
    }

    void throttleDown() {
        int throttleLevel = this._throttleCounter.decrementAndGet();
        LOGGER.debug("PUT Throttle Down: {}", throttleLevel);
        if (throttleLevel < 0) {
            LOGGER.debug("Unbalanced throttle");
            this._throttleCounter.set(0);
        }
        LOGGER.debug("Connector throttle {}", throttleLevel);
    }

    void setTestMode(boolean mode) {
        this._testMode = mode;
    }
}

