/*
 * Decompiled with CFR 0.152.
 */
package io.milvus.bulkwriter;

import com.google.gson.Gson;
import io.milvus.bulkwriter.StageFileManagerParam;
import io.milvus.bulkwriter.common.clientenum.ConnectType;
import io.milvus.bulkwriter.common.utils.FileUtils;
import io.milvus.bulkwriter.model.UploadFilesResult;
import io.milvus.bulkwriter.request.stage.ApplyStageRequest;
import io.milvus.bulkwriter.request.stage.BaseStageRequest;
import io.milvus.bulkwriter.request.stage.UploadFilesRequest;
import io.milvus.bulkwriter.resolver.EndpointResolver;
import io.milvus.bulkwriter.response.ApplyStageResponse;
import io.milvus.bulkwriter.restful.DataStageUtils;
import io.milvus.bulkwriter.storage.StorageClient;
import io.milvus.bulkwriter.storage.client.MinioStorageClient;
import io.milvus.exception.ParamException;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StageFileManager {
    private static final Logger logger = LoggerFactory.getLogger(StageFileManager.class);
    private final String cloudEndpoint;
    private final String apiKey;
    private final String stageName;
    private final ConnectType connectType;
    private final ExecutorService executor;
    private StorageClient storageClient;
    private ApplyStageResponse applyStageResponse;

    public StageFileManager(StageFileManagerParam stageWriterParam) {
        this.cloudEndpoint = stageWriterParam.getCloudEndpoint();
        this.apiKey = stageWriterParam.getApiKey();
        this.stageName = stageWriterParam.getStageName();
        this.connectType = stageWriterParam.getConnectType();
        this.executor = Executors.newFixedThreadPool(10);
    }

    public CompletableFuture<UploadFilesResult> uploadFilesAsync(UploadFilesRequest request) {
        String localDirOrFilePath = request.getSourceFilePath();
        Pair<List<String>, Long> localPathPair = FileUtils.processLocalPath(localDirOrFilePath);
        String stagePath = this.convertDirPath(request.getTargetStagePath());
        this.refreshStageAndClient(stagePath);
        this.initValidator(localPathPair);
        AtomicInteger currentFileCount = new AtomicInteger(0);
        AtomicLong processedBytes = new AtomicLong(0L);
        long totalBytes = (Long)localPathPair.getValue();
        long totalFilesCount = ((List)localPathPair.getKey()).size();
        long startTime = System.currentTimeMillis();
        return ((CompletableFuture)CompletableFuture.allOf((CompletableFuture[])((List)localPathPair.getKey()).stream().map(localFilePath -> CompletableFuture.runAsync(() -> {
            File file = new File((String)localFilePath);
            long fileStartTime = System.currentTimeMillis();
            try {
                this.uploadLocalFileToStage((String)localFilePath, localDirOrFilePath, stagePath);
                long bytes = processedBytes.addAndGet(file.length());
                int completeCount = currentFileCount.incrementAndGet();
                long elapsed = System.currentTimeMillis() - fileStartTime;
                double percent = totalBytes == 0L ? 100.0 : (double)bytes * 100.0 / (double)totalBytes;
                logger.info("Uploaded file {}/{}: {} ({} bytes) elapsed:{} ms, progress(total bytes): {}/{} bytes, progress(total percentage):{}%", new Object[]{completeCount, totalFilesCount, localFilePath, file.length(), elapsed, bytes, totalBytes, String.format("%.2f", percent)});
            }
            catch (Exception e) {
                logger.error("Upload failed: {}", localFilePath, (Object)e);
                throw new CompletionException(e);
            }
        }, this.executor)).toArray(CompletableFuture[]::new)).whenComplete((v, t) -> {})).thenApply(v -> {
            long totalElapsed = (System.currentTimeMillis() - startTime) / 1000L;
            logger.info("all files in {} has been async uploaded to stage, stageName:{}, stagePath:{}, totalFileCount:{}, totalFileSize:{}, cost times:{} s", new Object[]{localDirOrFilePath, this.applyStageResponse.getStageName(), stagePath, ((List)localPathPair.getKey()).size(), localPathPair.getValue(), totalElapsed});
            return UploadFilesResult.builder().stageName(this.applyStageResponse.getStageName()).path(stagePath).build();
        });
    }

    public void shutdownGracefully() {
        this.executor.shutdown();
        try {
            if (!this.executor.awaitTermination(5L, TimeUnit.SECONDS)) {
                logger.warn("Executor didn't terminate in time, forcing shutdown...");
                this.executor.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            logger.error("Interrupted while waiting for executor to shutdown", (Throwable)e);
            this.executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    private void initValidator(Pair<List<String>, Long> localPathPair) {
        if ((Long)localPathPair.getValue() > this.applyStageResponse.getCondition().getMaxContentLength()) {
            String msg = String.format("localFileTotalSize %s exceeds the maximum contentLength limit %s defined in the condition. If you want to upload larger files, please contact us to lift the restriction", localPathPair.getValue(), this.applyStageResponse.getCondition().getMaxContentLength());
            logger.error(msg);
            throw new ParamException(msg);
        }
    }

    private void refreshStageAndClient(String path) {
        logger.info("refreshing Stage info...");
        BaseStageRequest applyStageRequest = ((ApplyStageRequest.ApplyStageRequestBuilder)((ApplyStageRequest.ApplyStageRequestBuilder)((ApplyStageRequest.ApplyStageRequestBuilder)ApplyStageRequest.builder().apiKey(this.apiKey)).stageName(this.stageName)).path(path)).build();
        String result = DataStageUtils.applyStage(this.cloudEndpoint, applyStageRequest);
        this.applyStageResponse = (ApplyStageResponse)new Gson().fromJson(result, ApplyStageResponse.class);
        logger.info("stage info refreshed");
        String endpoint = EndpointResolver.resolveEndpoint(this.applyStageResponse.getEndpoint(), this.applyStageResponse.getCloud(), this.applyStageResponse.getRegion(), this.connectType);
        this.storageClient = MinioStorageClient.getStorageClient(this.applyStageResponse.getCloud(), endpoint, this.applyStageResponse.getCredentials().getTmpAK(), this.applyStageResponse.getCredentials().getTmpSK(), this.applyStageResponse.getCredentials().getSessionToken(), this.applyStageResponse.getRegion(), null);
        logger.info("storage client refreshed");
    }

    private String convertDirPath(String inputPath) {
        if (StringUtils.isEmpty((CharSequence)inputPath) || inputPath.equals("/")) {
            return "";
        }
        if (inputPath.endsWith("/")) {
            return inputPath;
        }
        return inputPath + "/";
    }

    private void uploadLocalFileToStage(String localFilePath, String rootPath, String stagePath) {
        File file = new File(localFilePath);
        Path filePath = file.toPath().toAbsolutePath();
        Path root = Paths.get(rootPath, new String[0]).toAbsolutePath();
        String relativePath = root.toFile().isFile() ? file.getName() : root.relativize(filePath).toString().replace("\\", "/");
        String remoteFilePath = this.applyStageResponse.getStagePrefix() + stagePath + relativePath;
        this.putObjectWithRetry(file, remoteFilePath, stagePath);
    }

    private void putObjectWithRetry(File file, String remoteFilePath, String stagePath) {
        this.refreshIfExpire(stagePath);
        String msg = "upload " + file.getAbsolutePath();
        this.withRetry(msg, () -> {
            try {
                this.storageClient.putObject(file, this.applyStageResponse.getBucketName(), remoteFilePath);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, stagePath);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void refreshIfExpire(String stagePath) {
        Instant instant = Instant.parse(this.applyStageResponse.getCredentials().getExpireTime());
        Date expireTime = Date.from(instant);
        if (new Date().after(expireTime)) {
            StageFileManager stageFileManager = this;
            synchronized (stageFileManager) {
                if (new Date().after(expireTime)) {
                    this.refreshStageAndClient(stagePath);
                }
            }
        }
    }

    private <T> T withRetry(String actionName, Callable<T> callable, String stagePath) {
        int maxRetries = 5;
        int attempt = 0;
        while (attempt < 5) {
            try {
                return callable.call();
            }
            catch (RuntimeException e) {
                throw e;
            }
            catch (Exception e) {
                this.refreshStageAndClient(stagePath);
                logger.warn("Attempt {} failed to {}", new Object[]{++attempt, actionName, e});
                if (attempt == 5) {
                    throw new RuntimeException(actionName + " failed after " + 5 + " attempts", e);
                }
                try {
                    Thread.sleep(5000L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        throw new RuntimeException(actionName + " failed unexpectedly.");
    }

    private void withRetry(String actionName, Runnable runnable, String stagePath) {
        this.withRetry(actionName, () -> {
            runnable.run();
            return null;
        }, stagePath);
    }
}

