/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.client.jdbc;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPInputStream;
import net.snowflake.client.core.HttpUtil;
import net.snowflake.client.jdbc.ErrorCode;
import net.snowflake.client.jdbc.RestRequest;
import net.snowflake.client.jdbc.SnowflakeResultChunk;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.client.jdbc.SnowflakeUtil;
import net.snowflake.client.jdbc.internal.apache.http.Header;
import net.snowflake.client.jdbc.internal.apache.http.HttpEntity;
import net.snowflake.client.jdbc.internal.apache.http.HttpResponse;
import net.snowflake.client.jdbc.internal.apache.http.client.HttpClient;
import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpGet;
import net.snowflake.client.jdbc.internal.apache.http.client.utils.URIBuilder;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonFactory;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonParser;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonToken;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.MappingJsonFactory;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;

public class SnowflakeChunkDownloader {
    private static final String SSE_C_ALGORITHM = "x-amz-server-side-encryption-customer-algorithm";
    private static final String SSE_C_KEY = "x-amz-server-side-encryption-customer-key";
    private static final String SSE_C_AES = "AES256";
    private static final ObjectMapper mapper = new ObjectMapper();
    private static final JsonFactory jsonFactory = new MappingJsonFactory();
    private static final Logger logger = Logger.getLogger(SnowflakeChunkDownloader.class.getName());
    private SnowflakeResultChunk.ResultChunkDataCache chunkDataCache = new SnowflakeResultChunk.ResultChunkDataCache();
    private List<SnowflakeResultChunk> chunks = null;
    private int nextChunkToConsume = 0;
    private int nextChunkToDownload = 0;
    private final int prefetchSlots;
    private boolean useJsonParser = false;
    private ThreadPoolExecutor executor;
    private long numberMillisWaitingForChunks = 0L;
    private boolean terminated = false;
    private final AtomicLong totalMillisDownloadingChunks = new AtomicLong(0L);
    private final AtomicLong totalMillisParsingChunks = new AtomicLong(0L);
    private final String qrmk;
    private Map<String, String> chunkHeadersMap = null;
    private final int networkTimeoutInMilli;
    private long memoryLimit;
    private long currentMemoryUsage = 0L;

    private static ThreadPoolExecutor createChunkDownloaderExecutorService(final String threadNamePrefix, int parallel) {
        ThreadFactory threadFactory = new ThreadFactory(){
            private int threadCount = 1;

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName(threadNamePrefix + this.threadCount++);
                thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

                    @Override
                    public void uncaughtException(Thread t, Throwable e) {
                        logger.log(Level.SEVERE, "uncaughtException in thread: " + t + " {0}", e);
                    }
                });
                return thread;
            }
        };
        return (ThreadPoolExecutor)Executors.newFixedThreadPool(parallel, threadFactory);
    }

    public SnowflakeChunkDownloader(int colCount, JsonNode chunksData, int prefetchThreads, String qrmk, JsonNode chunkHeaders, int networkTimeoutInMilli, boolean useJsonParser, long memoryLimit, boolean efficientChunkStorage) {
        this.qrmk = qrmk;
        this.networkTimeoutInMilli = networkTimeoutInMilli;
        this.prefetchSlots = prefetchThreads * 2;
        this.useJsonParser = useJsonParser;
        this.memoryLimit = Math.min(memoryLimit, (long)((double)Runtime.getRuntime().maxMemory() * 0.8));
        logger.log(Level.FINE, "qrmk = {0}", qrmk);
        if (chunkHeaders != null && !chunkHeaders.isMissingNode()) {
            this.chunkHeadersMap = new HashMap<String, String>(2);
            Iterator<Map.Entry<String, JsonNode>> chunkHeadersIter = chunkHeaders.fields();
            while (chunkHeadersIter.hasNext()) {
                Map.Entry<String, JsonNode> chunkHeader = chunkHeadersIter.next();
                logger.log(Level.FINE, "add header key={0}, value={1}", new Object[]{chunkHeader.getKey(), chunkHeader.getValue().asText()});
                this.chunkHeadersMap.put(chunkHeader.getKey(), chunkHeader.getValue().asText());
            }
        }
        if (chunksData == null) {
            logger.log(Level.INFO, "no chunk data");
            return;
        }
        int numChunks = chunksData.size();
        this.chunks = new ArrayList<SnowflakeResultChunk>(numChunks);
        for (int idx = 0; idx < numChunks; ++idx) {
            JsonNode chunkNode = chunksData.get(idx);
            SnowflakeResultChunk chunk = new SnowflakeResultChunk(chunkNode.path("url").asText(), chunkNode.path("rowCount").asInt(), colCount, chunkNode.path("uncompressedSize").asInt(), efficientChunkStorage);
            logger.log(Level.INFO, "add chunk, url={0} rowCount={1}", new Object[]{chunk.getUrl(), chunk.getRowCount()});
            this.chunks.add(chunk);
        }
        int effectiveThreads = Math.min(prefetchThreads, numChunks);
        logger.log(Level.INFO, "#chunks: {0} #threads:{1} #slots:{2} -> pool:{3}", new Object[]{numChunks, prefetchThreads, this.prefetchSlots, effectiveThreads});
        this.executor = SnowflakeChunkDownloader.createChunkDownloaderExecutorService("result-chunk-downloader-", effectiveThreads);
        this.startNextDownloaders();
    }

    private void startNextDownloaders() {
        SnowflakeResultChunk nextChunk;
        long neededChunkMemory;
        logger.log(Level.INFO, "Submit {0} chunks to be pre-fetched", Math.min(this.prefetchSlots, this.chunks.size()));
        while (this.nextChunkToDownload - this.nextChunkToConsume < this.prefetchSlots && this.nextChunkToDownload < this.chunks.size() && (this.currentMemoryUsage + (neededChunkMemory = (nextChunk = this.chunks.get(this.nextChunkToDownload)).computeNeededChunkMemory()) <= this.memoryLimit || this.nextChunkToDownload - this.nextChunkToConsume <= 0)) {
            nextChunk.tryReuse(this.chunkDataCache);
            this.currentMemoryUsage += neededChunkMemory;
            logger.log(Level.INFO, "submit chunk #{0} for downloading, url={1}", new Object[]{this.nextChunkToDownload, nextChunk.getUrl()});
            this.executor.submit(SnowflakeChunkDownloader.getDownloadChunkCallable(this, nextChunk, this.qrmk, this.nextChunkToDownload, this.chunkHeadersMap, this.networkTimeoutInMilli));
            ++this.nextChunkToDownload;
        }
        this.chunkDataCache.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SnowflakeResultChunk getNextChunkToConsume() throws InterruptedException, SnowflakeSQLException {
        if (this.nextChunkToConsume > 0) {
            int prevChunk = this.nextChunkToConsume - 1;
            logger.log(Level.INFO, "free chunk data for chunk #{0}", prevChunk);
            this.currentMemoryUsage -= this.chunks.get(prevChunk).computeNeededChunkMemory();
            if (this.nextChunkToDownload < this.chunks.size()) {
                this.chunkDataCache.add(this.chunks.get(prevChunk));
            } else {
                this.chunkDataCache.clear();
            }
            this.chunks.get(prevChunk).freeData();
        }
        if (this.nextChunkToConsume >= this.chunks.size()) {
            logger.log(Level.INFO, "no more chunk");
            return null;
        }
        this.startNextDownloaders();
        SnowflakeResultChunk currentChunk = this.chunks.get(this.nextChunkToConsume);
        if (currentChunk.getDownloadState() == SnowflakeResultChunk.DownloadState.SUCCESS) {
            logger.log(Level.INFO, "chunk #{0} is ready to consume", this.nextChunkToConsume);
            ++this.nextChunkToConsume;
            return currentChunk;
        }
        try {
            logger.log(Level.INFO, "chunk #{0} is not ready to consume", this.nextChunkToConsume);
            currentChunk.getLock().lock();
            logger.log(Level.INFO, "consumer get lock to check chunk state");
            while (currentChunk.getDownloadState() != SnowflakeResultChunk.DownloadState.SUCCESS && currentChunk.getDownloadState() != SnowflakeResultChunk.DownloadState.FAILURE) {
                logger.log(Level.INFO, "wait for chunk #{0} to be ready, currentchunk state is: {1}", new Object[]{this.nextChunkToConsume, currentChunk.getDownloadState()});
                long startTime = System.currentTimeMillis();
                currentChunk.getDownloadCondition().await();
                this.numberMillisWaitingForChunks += System.currentTimeMillis() - startTime;
                logger.log(Level.INFO, "woken up from waiting for chunk #{0} to be ready", this.nextChunkToConsume);
            }
            if (currentChunk.getDownloadState() == SnowflakeResultChunk.DownloadState.FAILURE) {
                logger.log(Level.SEVERE, "downloader encountered error: {0}", currentChunk.getDownloadError());
                throw new SnowflakeSQLException("XX000", ErrorCode.INTERNAL_ERROR.getMessageCode(), currentChunk.getDownloadError());
            }
            logger.log(Level.INFO, "chunk #{0} is ready to consume", this.nextChunkToConsume);
            ++this.nextChunkToConsume;
            SnowflakeResultChunk snowflakeResultChunk = currentChunk;
            return snowflakeResultChunk;
        }
        finally {
            logger.log(Level.INFO, "consumer free lock");
            currentChunk.getLock().unlock();
        }
    }

    public void terminate() {
        if (!this.terminated) {
            logger.log(Level.INFO, "Total milliseconds waiting for chunks: {0}, Total memory used: {1}, total download time: {2} millisec, total parsing time: {3} milliseconds, total chunks: {4}", new Object[]{this.numberMillisWaitingForChunks, Runtime.getRuntime().totalMemory(), this.totalMillisDownloadingChunks.get(), this.totalMillisParsingChunks.get(), this.chunks.size()});
            if (this.executor != null) {
                this.executor.shutdown();
                this.executor = null;
            }
            this.chunks = null;
            this.chunkDataCache.clear();
            this.terminated = true;
        }
    }

    private void addDownloadTime(long downloadTime) {
        this.totalMillisDownloadingChunks.addAndGet(downloadTime);
    }

    private void addParsingTime(long parsingTime) {
        this.totalMillisParsingChunks.addAndGet(parsingTime);
    }

    private static Callable<Void> getDownloadChunkCallable(final SnowflakeChunkDownloader downloader, final SnowflakeResultChunk resultChunk, final String qrmk, final int chunkIndex, final Map<String, String> chunkHeadersMap, final int networkTimeoutInMilli) {
        return new Callable<Void>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Void call() throws Exception {
                try {
                    SequenceInputStream jsonInputStream;
                    try {
                        resultChunk.getLock().lock();
                        resultChunk.setDownloadState(SnowflakeResultChunk.DownloadState.IN_PROGRESS);
                    }
                    finally {
                        resultChunk.getLock().unlock();
                    }
                    logger.log(Level.INFO, "Downloading chunk {0}, url={1}", new Object[]{chunkIndex, resultChunk.getUrl()});
                    long startTime = System.currentTimeMillis();
                    HttpResponse response = this.getResultChunk(resultChunk.getUrl());
                    if (response == null || response.getStatusLine().getStatusCode() != 200) {
                        logger.log(Level.SEVERE, "Error fetching chunk from: {0}", resultChunk.getUrl());
                        SnowflakeUtil.logResponseDetails(response, logger);
                        throw new SnowflakeSQLException("58030", ErrorCode.NETWORK_ERROR.getMessageCode(), "Error encountered when downloading a result chunk: HTTP status=" + (response != null ? Integer.valueOf(response.getStatusLine().getStatusCode()) : "null response"));
                    }
                    HttpEntity entity = response.getEntity();
                    try {
                        InputStream is = new HttpUtil.HttpInputStream(entity.getContent());
                        Header encoding = response.getFirstHeader("Content-Encoding");
                        if (encoding != null) {
                            if (encoding.getValue().equalsIgnoreCase("gzip")) {
                                is = new GZIPInputStream(is, 65536);
                            } else {
                                throw new SnowflakeSQLException("XX000", ErrorCode.INTERNAL_ERROR.getMessageCode(), "Exception: unexpected compression got " + encoding.getValue());
                            }
                        }
                        jsonInputStream = new SequenceInputStream(Collections.enumeration(Arrays.asList(new ByteArrayInputStream("[".getBytes()), is, new ByteArrayInputStream("]".getBytes()))));
                    }
                    catch (Exception ex) {
                        logger.log(Level.SEVERE, "Failed to uncompress data: {0}", response);
                        throw ex;
                    }
                    resultChunk.setDownloadTime(System.currentTimeMillis() - startTime);
                    downloader.addDownloadTime(resultChunk.getDownloadTime());
                    startTime = System.currentTimeMillis();
                    if (logger.isLoggable(Level.FINER)) {
                        logger.log(Level.FINER, "Time: {0} Json response: {1}", new Object[]{System.currentTimeMillis(), response});
                    }
                    JsonNode resultData = null;
                    try {
                        if (downloader.useJsonParser) {
                            this.parseJsonToChunk(jsonInputStream, resultChunk);
                        } else {
                            resultData = mapper.readTree(jsonInputStream);
                        }
                    }
                    catch (Exception ex) {
                        logger.log(Level.SEVERE, "Exception when parsing result", ex);
                        throw new SnowflakeSQLException(ex, "XX000", (int)ErrorCode.INTERNAL_ERROR.getMessageCode(), "Exception: " + ex.getLocalizedMessage() + "\nBad result json: " + response.toString());
                    }
                    finally {
                        ((InputStream)jsonInputStream).close();
                    }
                    resultChunk.setParseTime(System.currentTimeMillis() - startTime);
                    downloader.addParsingTime(resultChunk.getParseTime());
                    resultChunk.setResultData(resultData);
                    if (logger.isLoggable(Level.INFO)) {
                        logger.log(Level.INFO, "Finished preparing chunk data for {0}, total download time={1}ms, total parse time={2}ms", new Object[]{resultChunk.getUrl(), resultChunk.getDownloadTime(), resultChunk.getParseTime()});
                    }
                    try {
                        resultChunk.getLock().lock();
                        logger.log(Level.FINE, "get lock to change the chunk to be ready to consume");
                        logger.log(Level.FINE, "wake up consumer if it is waiting for a chunk to be ready");
                        resultChunk.setDownloadState(SnowflakeResultChunk.DownloadState.SUCCESS);
                        resultChunk.getDownloadCondition().signal();
                    }
                    finally {
                        logger.log(Level.FINE, "Downloaded chunk {0}, free lock", chunkIndex);
                        resultChunk.getLock().unlock();
                    }
                }
                catch (Throwable ex) {
                    try {
                        logger.log(Level.INFO, "get lock to set chunk download error");
                        resultChunk.getLock().lock();
                        resultChunk.setDownloadState(SnowflakeResultChunk.DownloadState.FAILURE);
                        resultChunk.setDownloadError(ex.getLocalizedMessage());
                        logger.log(Level.INFO, "wake up consumer if it is waiting for a chunk to be ready");
                        resultChunk.getDownloadCondition().signal();
                    }
                    finally {
                        logger.log(Level.INFO, "Failed to download chunk {0}, free lock", chunkIndex);
                        resultChunk.getLock().unlock();
                    }
                    logger.log(Level.SEVERE, "Exception encountered ({0}:{1}) fetching chunk from: {2}", new Object[]{ex.getClass().getName(), ex.getLocalizedMessage(), resultChunk.getUrl()});
                    logger.log(Level.SEVERE, "Exception: ", ex);
                }
                return null;
            }

            private void parseJsonToChunk(InputStream jsonInputStream, SnowflakeResultChunk resultChunk2) throws IOException, SnowflakeSQLException {
                try (JsonParser jp = jsonFactory.createParser(jsonInputStream);){
                    JsonToken currentToken = jp.nextToken();
                    if (currentToken != JsonToken.START_ARRAY) {
                        throw new SnowflakeSQLException("XX000", ErrorCode.INTERNAL_ERROR.getMessageCode(), "Exception1: expected '[' got " + currentToken.asString());
                    }
                    while (jp.nextToken() != JsonToken.END_ARRAY) {
                        resultChunk2.addRow(mapper.readValue(jp, Object[].class));
                    }
                    resultChunk2.ensureRowsComplete();
                }
            }

            private HttpResponse getResultChunk(String chunkUrl) throws URISyntaxException, IOException, SnowflakeSQLException {
                URIBuilder uriBuilder = new URIBuilder(chunkUrl);
                HttpGet httpRequest = new HttpGet(uriBuilder.build());
                if (chunkHeadersMap != null && chunkHeadersMap.size() != 0) {
                    for (Map.Entry entry : chunkHeadersMap.entrySet()) {
                        logger.log(Level.FINE, "Adding header key={0}, value={1}", new Object[]{entry.getKey(), entry.getValue()});
                        httpRequest.addHeader((String)entry.getKey(), (String)entry.getValue());
                    }
                } else if (qrmk != null) {
                    httpRequest.addHeader(SnowflakeChunkDownloader.SSE_C_ALGORITHM, SnowflakeChunkDownloader.SSE_C_AES);
                    httpRequest.addHeader(SnowflakeChunkDownloader.SSE_C_KEY, qrmk);
                    logger.log(Level.FINE, "Adding SSE-C headers");
                }
                logger.log(Level.FINE, "Fetching result: {0}", resultChunk.getUrl());
                HttpClient httpClient = HttpUtil.getHttpClient();
                HttpResponse response = RestRequest.execute(httpClient, httpRequest, networkTimeoutInMilli / 1000, 0, null);
                logger.log(Level.INFO, "Call returned for URL: {0}", chunkUrl);
                return response;
            }
        };
    }
}

