/*
 * Decompiled with CFR 0.152.
 */
package com.databricks.jdbc.api.impl.volume;

import com.databricks.internal.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import com.databricks.internal.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
import com.databricks.internal.apache.hc.core5.concurrent.FutureCallback;
import com.databricks.internal.apache.hc.core5.http.ContentType;
import com.databricks.internal.apache.hc.core5.http.nio.AsyncRequestProducer;
import com.databricks.internal.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
import com.databricks.internal.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
import com.databricks.internal.apache.http.HttpEntity;
import com.databricks.internal.apache.http.entity.InputStreamEntity;
import com.databricks.internal.google.common.annotations.VisibleForTesting;
import com.databricks.internal.sdk.WorkspaceClient;
import com.databricks.internal.sdk.core.ApiClient;
import com.databricks.internal.sdk.core.DatabricksException;
import com.databricks.internal.sdk.core.error.platform.NotFound;
import com.databricks.internal.sdk.core.http.Request;
import com.databricks.jdbc.api.IDatabricksVolumeClient;
import com.databricks.jdbc.api.impl.VolumeOperationStatus;
import com.databricks.jdbc.api.impl.volume.DatabricksUCVolumeClient;
import com.databricks.jdbc.api.impl.volume.InputStreamFixedLenProducer;
import com.databricks.jdbc.api.impl.volume.VolumeInputStream;
import com.databricks.jdbc.api.impl.volume.VolumeOperationProcessor;
import com.databricks.jdbc.api.impl.volume.VolumeUploadCallback;
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
import com.databricks.jdbc.common.DatabricksClientConfiguratorManager;
import com.databricks.jdbc.common.DatabricksJdbcConstants;
import com.databricks.jdbc.common.HttpClientType;
import com.databricks.jdbc.common.util.DatabricksThreadContextHolder;
import com.databricks.jdbc.common.util.JsonUtil;
import com.databricks.jdbc.common.util.StringUtil;
import com.databricks.jdbc.common.util.VolumeRetryUtil;
import com.databricks.jdbc.common.util.VolumeUtil;
import com.databricks.jdbc.common.util.WildcardUtil;
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
import com.databricks.jdbc.dbclient.impl.http.DatabricksHttpClientFactory;
import com.databricks.jdbc.exception.DatabricksSQLException;
import com.databricks.jdbc.exception.DatabricksVolumeOperationException;
import com.databricks.jdbc.log.JdbcLogger;
import com.databricks.jdbc.log.JdbcLoggerFactory;
import com.databricks.jdbc.model.client.filesystem.CreateDeleteUrlRequest;
import com.databricks.jdbc.model.client.filesystem.CreateDeleteUrlResponse;
import com.databricks.jdbc.model.client.filesystem.CreateDownloadUrlRequest;
import com.databricks.jdbc.model.client.filesystem.CreateDownloadUrlResponse;
import com.databricks.jdbc.model.client.filesystem.CreateUploadUrlRequest;
import com.databricks.jdbc.model.client.filesystem.CreateUploadUrlResponse;
import com.databricks.jdbc.model.client.filesystem.FileInfo;
import com.databricks.jdbc.model.client.filesystem.ListRequest;
import com.databricks.jdbc.model.client.filesystem.ListResponse;
import com.databricks.jdbc.model.client.filesystem.VolumePutResult;
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class DBFSVolumeClient
implements IDatabricksVolumeClient,
Closeable {
    private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(DBFSVolumeClient.class);
    private final IDatabricksConnectionContext connectionContext;
    private final IDatabricksHttpClient databricksHttpClient;
    private VolumeInputStream volumeInputStream = null;
    private long volumeStreamContentLength = -1L;
    final WorkspaceClient workspaceClient;
    final ApiClient apiClient;
    private final String allowedVolumeIngestionPaths;
    private final boolean enableVolumeOperations;
    private static final long INITIAL_RETRY_DELAY_MS = 200L;
    private static final long MAX_RETRY_DELAY_MS = 10000L;
    private final Semaphore presignedUrlSemaphore;
    private final ThreadLocalRandom random = ThreadLocalRandom.current();

    @VisibleForTesting
    public DBFSVolumeClient(WorkspaceClient workspaceClient) {
        this.connectionContext = null;
        this.workspaceClient = workspaceClient;
        this.apiClient = workspaceClient.apiClient();
        this.databricksHttpClient = null;
        this.allowedVolumeIngestionPaths = "";
        this.enableVolumeOperations = false;
        this.presignedUrlSemaphore = new Semaphore(50);
    }

    public DBFSVolumeClient(IDatabricksConnectionContext connectionContext) {
        this.connectionContext = connectionContext;
        this.workspaceClient = this.getWorkspaceClientFromConnectionContext(connectionContext);
        this.apiClient = this.workspaceClient.apiClient();
        this.databricksHttpClient = DatabricksHttpClientFactory.getInstance().getClient(connectionContext, HttpClientType.VOLUME);
        this.allowedVolumeIngestionPaths = connectionContext.getVolumeOperationAllowedPaths();
        this.enableVolumeOperations = true;
        int maxConcurrentRequests = connectionContext.getMaxDBFSConcurrentPresignedRequests();
        this.presignedUrlSemaphore = new Semaphore(maxConcurrentRequests);
    }

    @Override
    public boolean prefixExists(String catalog, String schema, String volume, String prefix, boolean caseSensitive) throws SQLException {
        LOGGER.debug("Entering prefixExists method with parameters: catalog = {}, schema = {}, volume = {}, prefix = {}, caseSensitive = {}", catalog, schema, volume, prefix, caseSensitive);
        if (WildcardUtil.isNullOrEmpty(prefix)) {
            return false;
        }
        try {
            List<String> objects = this.listObjects(catalog, schema, volume, prefix, caseSensitive);
            return !objects.isEmpty();
        }
        catch (Exception e) {
            LOGGER.error(e, "Error checking prefix existence: catalog = {}, schema = {}, volume = {}, prefix = {}, caseSensitive = {}", catalog, schema, volume, prefix, caseSensitive);
            throw new DatabricksVolumeOperationException("Error checking prefix existence: " + e.getMessage(), (Throwable)e, DatabricksDriverErrorCode.VOLUME_OPERATION_INVALID_STATE);
        }
    }

    @Override
    public boolean objectExists(String catalog, String schema, String volume, String objectPath, boolean caseSensitive) throws SQLException {
        LOGGER.debug("Entering objectExists method with parameters: catalog = {}, schema = {}, volume = {}, objectPath = {}, caseSensitive = {}", catalog, schema, volume, objectPath, caseSensitive);
        if (WildcardUtil.isNullOrEmpty(objectPath)) {
            return false;
        }
        try {
            String baseName = StringUtil.getBaseNameFromPath(objectPath);
            ListResponse listResponse = this.getListResponse(VolumeUtil.VolumeOperationType.constructListPath(catalog, schema, volume, objectPath));
            if (listResponse != null && listResponse.getFiles() != null) {
                for (FileInfo file : listResponse.getFiles()) {
                    String fileName = StringUtil.getBaseNameFromPath(file.getPath());
                    if (!(caseSensitive ? fileName.equals(baseName) : fileName.equalsIgnoreCase(baseName))) continue;
                    return true;
                }
            }
            return false;
        }
        catch (Exception e) {
            LOGGER.error(e, "Error checking object existence: catalog = {}, schema = {}, volume = {}, objectPath = {}, caseSensitive = {}", catalog, schema, volume, objectPath, caseSensitive);
            throw new DatabricksVolumeOperationException("Error checking object existence: " + e.getMessage(), (Throwable)e, DatabricksDriverErrorCode.VOLUME_OPERATION_INVALID_STATE);
        }
    }

    @Override
    public boolean volumeExists(String catalog, String schema, String volumeName, boolean caseSensitive) throws SQLException {
        LOGGER.debug("Entering volumeExists method with parameters: catalog = {}, schema = {}, volumeName = {}, caseSensitive = {}", catalog, schema, volumeName, caseSensitive);
        if (WildcardUtil.isNullOrEmpty(volumeName)) {
            return false;
        }
        try {
            String volumePath = StringUtil.getVolumePath(catalog, schema, volumeName);
            this.getListResponse(volumePath);
            return true;
        }
        catch (DatabricksVolumeOperationException e) {
            if (e.getCause() instanceof NotFound) {
                return false;
            }
            LOGGER.error(e, "Error checking volume existence: catalog = {}, schema = {}, volumeName = {}, caseSensitive = {}", catalog, schema, volumeName, caseSensitive);
            throw new DatabricksVolumeOperationException("Error checking volume existence: " + e.getMessage(), (Throwable)e, DatabricksDriverErrorCode.VOLUME_OPERATION_INVALID_STATE);
        }
    }

    @Override
    public List<String> listObjects(String catalog, String schema, String volume, String prefix, boolean caseSensitive) throws SQLException {
        LOGGER.debug("Entering listObjects method with parameters: catalog={}, schema={}, volume={}, prefix={}, caseSensitive={}", catalog, schema, volume, prefix, caseSensitive);
        String basename = StringUtil.getBaseNameFromPath(prefix);
        ListResponse listResponse = this.getListResponse(VolumeUtil.VolumeOperationType.constructListPath(catalog, schema, volume, prefix));
        return listResponse.getFiles().stream().map(FileInfo::getPath).map(path -> path.substring(path.lastIndexOf(47) + 1)).filter(fileName -> StringUtil.checkPrefixMatch(basename, fileName, caseSensitive)).collect(Collectors.toList());
    }

    @Override
    public boolean getObject(String catalog, String schema, String volume, String objectPath, String localPath) throws DatabricksVolumeOperationException {
        LOGGER.debug("Entering getObject method with parameters: catalog={}, schema={}, volume={}, objectPath={}, localPath={}", catalog, schema, volume, objectPath, localPath);
        try {
            CreateDownloadUrlResponse response = this.getCreateDownloadUrlResponse(DatabricksUCVolumeClient.getObjectFullPath(catalog, schema, volume, objectPath));
            VolumeOperationProcessor volumeOperationProcessor = VolumeOperationProcessor.Builder.createBuilder().operationType(VolumeUtil.VolumeOperationType.GET).operationUrl(response.getUrl()).localFilePath(localPath).allowedVolumeIngestionPathString(this.allowedVolumeIngestionPaths).databricksHttpClient(this.databricksHttpClient).build();
            volumeOperationProcessor.process();
            this.checkVolumeOperationError(volumeOperationProcessor);
        }
        catch (DatabricksSQLException e) {
            String errorMessage = String.format("Failed to get object - {%s}", e.getMessage());
            LOGGER.error(e, errorMessage);
            throw new DatabricksVolumeOperationException(errorMessage, (Throwable)e, DatabricksDriverErrorCode.VOLUME_OPERATION_FILE_DOWNLOAD_ERROR);
        }
        return true;
    }

    @Override
    public InputStreamEntity getObject(String catalog, String schema, String volume, String objectPath) throws DatabricksVolumeOperationException {
        LOGGER.debug("Entering getObject method with parameters: catalog={}, schema={}, volume={}, objectPath={}", catalog, schema, volume, objectPath);
        try {
            CreateDownloadUrlResponse response = this.getCreateDownloadUrlResponse(DatabricksUCVolumeClient.getObjectFullPath(catalog, schema, volume, objectPath));
            VolumeOperationProcessor volumeOperationProcessor = VolumeOperationProcessor.Builder.createBuilder().operationType(VolumeUtil.VolumeOperationType.GET).operationUrl(response.getUrl()).isAllowedInputStreamForVolumeOperation(true).isEnableVolumeOperations(this.enableVolumeOperations).databricksHttpClient(this.databricksHttpClient).getStreamReceiver(entity -> {
                try {
                    this.setVolumeOperationEntityStream((HttpEntity)entity);
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to set result set volumeOperationEntityStream", e);
                }
            }).build();
            volumeOperationProcessor.process();
            this.checkVolumeOperationError(volumeOperationProcessor);
            return this.getVolumeOperationInputStream();
        }
        catch (DatabricksSQLException e) {
            String errorMessage = String.format("Failed to get object - {%s}", e.getMessage());
            LOGGER.error(e, errorMessage);
            throw new DatabricksVolumeOperationException(errorMessage, (Throwable)e, DatabricksDriverErrorCode.VOLUME_OPERATION_FILE_DOWNLOAD_ERROR);
        }
    }

    @Override
    public boolean putObject(String catalog, String schema, String volume, String objectPath, String localPath, boolean toOverwrite) throws DatabricksVolumeOperationException {
        LOGGER.debug("Entering putObject method with parameters: catalog={}, schema={}, volume={}, objectPath={}, localPath={}", catalog, schema, volume, objectPath, localPath);
        try {
            CreateUploadUrlResponse response = this.getCreateUploadUrlResponse(DatabricksUCVolumeClient.getObjectFullPath(catalog, schema, volume, objectPath));
            VolumeOperationProcessor volumeOperationProcessor = VolumeOperationProcessor.Builder.createBuilder().operationType(VolumeUtil.VolumeOperationType.PUT).isEnableVolumeOperations(this.enableVolumeOperations).operationUrl(response.getUrl()).localFilePath(localPath).allowedVolumeIngestionPathString(this.allowedVolumeIngestionPaths).databricksHttpClient(this.databricksHttpClient).build();
            volumeOperationProcessor.process();
            this.checkVolumeOperationError(volumeOperationProcessor);
        }
        catch (DatabricksSQLException e) {
            String errorMessage = String.format("Failed to put object - {%s}", e.getMessage());
            LOGGER.error(e, errorMessage);
            throw new DatabricksVolumeOperationException(errorMessage, (Throwable)e, DatabricksDriverErrorCode.VOLUME_OPERATION_PUT_OPERATION_EXCEPTION);
        }
        return true;
    }

    @Override
    public boolean putObject(String catalog, String schema, String volume, String objectPath, InputStream inputStream, long contentLength, boolean toOverwrite) throws DatabricksVolumeOperationException {
        LOGGER.debug("Entering putObject method with parameters: catalog={}, schema={}, volume={}, objectPath={}, inputStream={}, contentLength={}, toOverwrite={}", catalog, schema, volume, objectPath, inputStream, contentLength, toOverwrite);
        try {
            CreateUploadUrlResponse response = this.getCreateUploadUrlResponse(DatabricksUCVolumeClient.getObjectFullPath(catalog, schema, volume, objectPath));
            InputStreamEntity inputStreamEntity = new InputStreamEntity(inputStream, contentLength);
            VolumeOperationProcessor volumeOperationProcessor = VolumeOperationProcessor.Builder.createBuilder().operationType(VolumeUtil.VolumeOperationType.PUT).operationUrl(response.getUrl()).isAllowedInputStreamForVolumeOperation(true).isEnableVolumeOperations(this.enableVolumeOperations).inputStream(inputStreamEntity).databricksHttpClient(this.databricksHttpClient).build();
            volumeOperationProcessor.process();
            this.checkVolumeOperationError(volumeOperationProcessor);
        }
        catch (DatabricksSQLException e) {
            String errorMessage = String.format("Failed to put object with inputStream- {%s}", e.getMessage());
            LOGGER.error(e, errorMessage);
            throw new DatabricksVolumeOperationException(errorMessage, (Throwable)e, DatabricksDriverErrorCode.VOLUME_OPERATION_PUT_OPERATION_EXCEPTION);
        }
        return true;
    }

    @Override
    public boolean deleteObject(String catalog, String schema, String volume, String objectPath) throws DatabricksVolumeOperationException {
        LOGGER.debug("Entering deleteObject method with parameters: catalog={}, schema={}, volume={}, objectPath={}", catalog, schema, volume, objectPath);
        try {
            CreateDeleteUrlResponse response = this.getCreateDeleteUrlResponse(DatabricksUCVolumeClient.getObjectFullPath(catalog, schema, volume, objectPath));
            VolumeOperationProcessor volumeOperationProcessor = VolumeOperationProcessor.Builder.createBuilder().operationType(VolumeUtil.VolumeOperationType.REMOVE).isEnableVolumeOperations(this.enableVolumeOperations).operationUrl(response.getUrl()).databricksHttpClient(this.databricksHttpClient).build();
            volumeOperationProcessor.process();
            this.checkVolumeOperationError(volumeOperationProcessor);
        }
        catch (DatabricksSQLException e) {
            String errorMessage = String.format("Failed to delete object {%s}", e.getMessage());
            LOGGER.error(e, errorMessage);
            throw new DatabricksVolumeOperationException(errorMessage, (Throwable)e, DatabricksDriverErrorCode.VOLUME_OPERATION_DELETE_OPERATION_EXCEPTION);
        }
        return true;
    }

    WorkspaceClient getWorkspaceClientFromConnectionContext(IDatabricksConnectionContext connectionContext) {
        return DatabricksClientConfiguratorManager.getInstance().getConfigurator(connectionContext).getWorkspaceClient();
    }

    CreateUploadUrlResponse getCreateUploadUrlResponse(String objectPath) throws DatabricksVolumeOperationException {
        LOGGER.debug("Entering getCreateUploadUrlResponse method with parameters: objectPath={}", objectPath);
        CreateUploadUrlRequest request = new CreateUploadUrlRequest(objectPath);
        try {
            Request req = new Request("POST", "/api/2.0/fs/create-upload-url", this.apiClient.serialize(request));
            req.withHeaders(DatabricksJdbcConstants.JSON_HTTP_HEADERS);
            return this.apiClient.execute(req, CreateUploadUrlResponse.class);
        }
        catch (DatabricksException | IOException e) {
            String errorMessage = String.format("Failed to get create upload url response - {%s}", e.getMessage());
            LOGGER.error(e, errorMessage);
            throw new DatabricksVolumeOperationException(errorMessage, (Throwable)e, DatabricksDriverErrorCode.VOLUME_OPERATION_URL_GENERATION_ERROR);
        }
    }

    CreateDownloadUrlResponse getCreateDownloadUrlResponse(String objectPath) throws DatabricksVolumeOperationException {
        LOGGER.debug("Entering getCreateDownloadUrlResponse method with parameters: objectPath={}", objectPath);
        CreateDownloadUrlRequest request = new CreateDownloadUrlRequest(objectPath);
        try {
            Request req = new Request("POST", "/api/2.0/fs/create-download-url", this.apiClient.serialize(request));
            req.withHeaders(DatabricksJdbcConstants.JSON_HTTP_HEADERS);
            return this.apiClient.execute(req, CreateDownloadUrlResponse.class);
        }
        catch (DatabricksException | IOException e) {
            String errorMessage = String.format("Failed to get create download url response - {%s}", e.getMessage());
            LOGGER.error(e, errorMessage);
            throw new DatabricksVolumeOperationException(errorMessage, (Throwable)e, DatabricksDriverErrorCode.VOLUME_OPERATION_URL_GENERATION_ERROR);
        }
    }

    CreateDeleteUrlResponse getCreateDeleteUrlResponse(String objectPath) throws DatabricksVolumeOperationException {
        LOGGER.debug("Entering getCreateDeleteUrlResponse method with parameters: objectPath={}", objectPath);
        CreateDeleteUrlRequest request = new CreateDeleteUrlRequest(objectPath);
        try {
            Request req = new Request("POST", "/api/2.0/fs/create-delete-url", this.apiClient.serialize(request));
            req.withHeaders(DatabricksJdbcConstants.JSON_HTTP_HEADERS);
            return this.apiClient.execute(req, CreateDeleteUrlResponse.class);
        }
        catch (DatabricksException | IOException e) {
            String errorMessage = String.format("Failed to get create delete url response - {%s}", e.getMessage());
            LOGGER.error(e, errorMessage);
            throw new DatabricksVolumeOperationException(errorMessage, (Throwable)e, DatabricksDriverErrorCode.VOLUME_OPERATION_URL_GENERATION_ERROR);
        }
    }

    ListResponse getListResponse(String listPath) throws DatabricksVolumeOperationException {
        LOGGER.debug("Entering getListResponse method with parameters : listPath={}", listPath);
        ListRequest request = new ListRequest(listPath);
        try {
            Request req = new Request("GET", "/api/2.0/fs/list");
            req.withHeaders(DatabricksJdbcConstants.JSON_HTTP_HEADERS);
            ApiClient.setQuery(req, request);
            return this.apiClient.execute(req, ListResponse.class);
        }
        catch (DatabricksException | IOException e) {
            String errorMessage = String.format("Failed to get list response - {%s}", e.getMessage());
            LOGGER.error(e, errorMessage);
            throw new DatabricksVolumeOperationException(errorMessage, (Throwable)e, DatabricksDriverErrorCode.VOLUME_OPERATION_INVALID_STATE);
        }
    }

    private void checkVolumeOperationError(VolumeOperationProcessor volumeOperationProcessor) throws DatabricksSQLException {
        if (volumeOperationProcessor.getStatus() == VolumeOperationStatus.FAILED) {
            throw new DatabricksSQLException("Volume operation failed: " + volumeOperationProcessor.getErrorMessage(), DatabricksDriverErrorCode.INVALID_STATE);
        }
        if (volumeOperationProcessor.getStatus() == VolumeOperationStatus.ABORTED) {
            throw new DatabricksSQLException("Volume operation aborted: " + volumeOperationProcessor.getErrorMessage(), DatabricksDriverErrorCode.INVALID_STATE);
        }
    }

    public void setVolumeOperationEntityStream(HttpEntity httpEntity) throws IOException {
        this.volumeInputStream = new VolumeInputStream(httpEntity);
        this.volumeStreamContentLength = httpEntity.getContentLength();
    }

    public InputStreamEntity getVolumeOperationInputStream() {
        return new InputStreamEntity((InputStream)this.volumeInputStream, this.volumeStreamContentLength);
    }

    @Override
    public void close() throws IOException {
        DatabricksThreadContextHolder.clearConnectionContext();
    }

    @Override
    public List<VolumePutResult> putFiles(String catalog, String schema, String volume, List<String> objectPaths, List<String> localPaths, boolean toOverwrite) {
        LOGGER.debug("Entering putFiles: catalog={}, schema={}, volume={}, files={}", catalog, schema, volume, objectPaths.size());
        if (objectPaths.size() != localPaths.size()) {
            String errorMessage = "objectPaths and localPaths \u2013 sizes differ";
            LOGGER.error(errorMessage);
            throw new IllegalArgumentException(errorMessage);
        }
        ArrayList<Optional<UploadRequest>> uploadRequests = new ArrayList<Optional<UploadRequest>>(objectPaths.size());
        for (int i = 0; i < objectPaths.size(); ++i) {
            String objPath = objectPaths.get(i);
            String fullPath = DatabricksUCVolumeClient.getObjectFullPath(catalog, schema, volume, objPath);
            String localPath = localPaths.get(i);
            Path file = Paths.get(localPath, new String[0]);
            if (!Files.exists(file, new LinkOption[0]) || !Files.isRegularFile(file, new LinkOption[0])) {
                String errorMessage = "File not found or not a file: " + localPath;
                LOGGER.error(errorMessage);
                uploadRequests.add(Optional.empty());
                continue;
            }
            UploadRequest request = new UploadRequest();
            request.objectPath = objPath;
            request.ucVolumePath = fullPath;
            request.file = file;
            request.originalIndex = i;
            request.errorMessage = null;
            uploadRequests.add(Optional.of(request));
        }
        return this.executeUploads(uploadRequests, localPaths);
    }

    @Override
    public List<VolumePutResult> putFiles(String catalog, String schema, String volume, List<String> objectPaths, List<InputStream> inputStreams, List<Long> contentLengths, boolean toOverwrite) {
        LOGGER.debug("Entering putFiles: catalog={}, schema={}, volume={}, streams={}", catalog, schema, volume, objectPaths.size());
        if (objectPaths.size() != inputStreams.size() || inputStreams.size() != contentLengths.size()) {
            String errorMessage = "objectPaths, inputStreams, contentLengths \u2013 sizes differ";
            LOGGER.error(errorMessage);
            throw new IllegalArgumentException(errorMessage);
        }
        ArrayList<Optional<UploadRequest>> uploadRequests = new ArrayList<Optional<UploadRequest>>(objectPaths.size());
        int i = 0;
        while (i < objectPaths.size()) {
            String objPath = objectPaths.get(i);
            String fullPath = DatabricksUCVolumeClient.getObjectFullPath(catalog, schema, volume, objPath);
            InputStream inputStream = inputStreams.get(i);
            long contentLength = contentLengths.get(i);
            UploadRequest request = new UploadRequest();
            request.objectPath = objPath;
            request.ucVolumePath = fullPath;
            request.inputStream = inputStream;
            request.contentLength = contentLength;
            request.originalIndex = i++;
            request.errorMessage = null;
            uploadRequests.add(Optional.of(request));
        }
        return this.executeUploads(uploadRequests, objectPaths);
    }

    private List<VolumePutResult> executeUploads(List<Optional<UploadRequest>> uploadRequests, List<String> originalPaths) {
        CompletableFuture[] futures = new CompletableFuture[uploadRequests.size()];
        for (int i = 0; i < uploadRequests.size(); ++i) {
            CompletableFuture uploadFuture;
            int index = i;
            Optional<UploadRequest> optionalRequest = uploadRequests.get(index);
            if (optionalRequest.isEmpty()) {
                String errorMessage = "File not found or not a file: " + originalPaths.get(index);
                futures[index] = CompletableFuture.completedFuture(new VolumePutResult(400, VolumeOperationStatus.FAILED, errorMessage));
                continue;
            }
            UploadRequest request = optionalRequest.get();
            futures[index] = uploadFuture = new CompletableFuture();
            LOGGER.debug("Uploading {} {}/{}: {} ({} bytes)", request.isFile() ? "file" : "stream", index + 1, uploadRequests.size(), request.objectPath, request.isFile() ? request.file.toFile().length() : request.contentLength);
            ((CompletableFuture)this.requestPresignedUrlWithRetry(request.ucVolumePath, request.objectPath, 1).thenAccept(response -> {
                String presignedUrl = response.getUrl();
                LOGGER.debug("Got presigned URL for {} {}: {}", request.isFile() ? "file" : "stream", index + 1, request.objectPath);
                try {
                    AsyncRequestProducer uploadProducer;
                    if (request.isFile()) {
                        uploadProducer = AsyncRequestBuilder.put().setUri(URI.create(presignedUrl)).setEntity(AsyncEntityProducers.create(request.file.toFile(), ContentType.DEFAULT_BINARY)).build();
                    } else {
                        InputStreamFixedLenProducer entity = new InputStreamFixedLenProducer(request.inputStream, request.contentLength);
                        uploadProducer = AsyncRequestBuilder.put().setUri(URI.create(presignedUrl)).setEntity(entity).build();
                    }
                    SimpleResponseConsumer uploadConsumer = SimpleResponseConsumer.create();
                    VolumeUploadCallback uploadCallback = new VolumeUploadCallback(this.databricksHttpClient, uploadFuture, request, this.presignedUrlSemaphore, this::requestPresignedUrlWithRetry, this::calculateRetryDelay, this.connectionContext);
                    this.databricksHttpClient.executeAsync(uploadProducer, uploadConsumer, uploadCallback);
                }
                catch (Exception e) {
                    String errorMessage = String.format("Error uploading %s: %s", request.objectPath, e.getMessage());
                    LOGGER.error(e, errorMessage);
                    uploadFuture.complete(new VolumePutResult(500, VolumeOperationStatus.FAILED, errorMessage));
                }
            })).exceptionally(e -> {
                String errorMessage = String.format("Failed to get presigned URL for %s: %s", request.objectPath, e.getMessage());
                LOGGER.error((Throwable)e, errorMessage);
                uploadFuture.complete(new VolumePutResult(500, VolumeOperationStatus.FAILED, errorMessage));
                return null;
            });
        }
        CompletableFuture.allOf(futures).join();
        ArrayList<VolumePutResult> results = new ArrayList<VolumePutResult>(futures.length);
        for (CompletableFuture future : futures) {
            results.add((VolumePutResult)future.join());
        }
        long successCount = results.stream().mapToLong(result -> result.getStatus() == VolumeOperationStatus.SUCCEEDED ? 1L : 0L).sum();
        boolean isFileUpload = uploadRequests.stream().filter(Optional::isPresent).findFirst().map(opt -> ((UploadRequest)opt.get()).isFile()).orElse(true);
        LOGGER.info("Completed uploads: {}/{} {} successful", successCount, results.size(), isFileUpload ? "files" : "streams");
        return results;
    }

    CompletableFuture<CreateUploadUrlResponse> requestPresignedUrlWithRetry(String ucVolumePath, String objectPath, int attempt) {
        return this.requestPresignedUrlWithRetry(ucVolumePath, objectPath, attempt, System.currentTimeMillis());
    }

    private CompletableFuture<CreateUploadUrlResponse> requestPresignedUrlWithRetry(final String ucVolumePath, final String objectPath, final int attempt, final long retryStartTime) {
        final CompletableFuture<CreateUploadUrlResponse> future = new CompletableFuture<CreateUploadUrlResponse>();
        try {
            this.presignedUrlSemaphore.acquire();
            future.whenComplete((response, throwable) -> {
                LOGGER.debug("Releasing semaphore permit for {}", objectPath);
                this.presignedUrlSemaphore.release();
            });
            try {
                LOGGER.debug("Requesting presigned URL for {} (attempt {})", objectPath, attempt);
                CreateUploadUrlRequest request = new CreateUploadUrlRequest(ucVolumePath);
                String requestBody = this.apiClient.serialize(request);
                AsyncRequestBuilder requestBuilder = AsyncRequestBuilder.post(URI.create(this.connectionContext.getHostUrl() + "/api/2.0/fs/create-upload-url"));
                Map<String, String> authHeaders = this.workspaceClient.config().authenticate();
                authHeaders.forEach(requestBuilder::addHeader);
                DatabricksJdbcConstants.JSON_HTTP_HEADERS.forEach(requestBuilder::addHeader);
                requestBuilder.setEntity(AsyncEntityProducers.create(requestBody.getBytes(), ContentType.APPLICATION_JSON));
                this.databricksHttpClient.executeAsync(requestBuilder.build(), SimpleResponseConsumer.create(), new FutureCallback<SimpleHttpResponse>(){
                    final /* synthetic */ DBFSVolumeClient this$0;
                    {
                        this.this$0 = this$0;
                    }

                    @Override
                    public void completed(SimpleHttpResponse result) {
                        if (result.getCode() >= 200 && result.getCode() < 300) {
                            try {
                                CreateUploadUrlResponse response = JsonUtil.getMapper().readValue(result.getBodyText(), CreateUploadUrlResponse.class);
                                future.complete(response);
                            }
                            catch (Exception e) {
                                future.completeExceptionally(e);
                            }
                        } else if (VolumeRetryUtil.isRetryableHttpCode(result.getCode(), this.this$0.connectionContext) && VolumeRetryUtil.shouldRetry(attempt, retryStartTime, this.this$0.connectionContext)) {
                            this.this$0.handleRetry(ucVolumePath, objectPath, attempt, future, retryStartTime);
                        } else {
                            String errorMsg = String.format("Failed to get presigned URL for %s: HTTP %d - %s", objectPath, result.getCode(), result.getReasonPhrase());
                            LOGGER.error(errorMsg);
                            future.completeExceptionally(new DatabricksVolumeOperationException(errorMsg, null, DatabricksDriverErrorCode.VOLUME_OPERATION_URL_GENERATION_ERROR));
                        }
                    }

                    @Override
                    public void failed(Exception ex) {
                        if (VolumeRetryUtil.shouldRetry(attempt, retryStartTime, this.this$0.connectionContext)) {
                            this.this$0.handleRetry(ucVolumePath, objectPath, attempt, future, retryStartTime);
                        } else {
                            LOGGER.error(ex, "Failed to get presigned URL for {} (attempt {})", objectPath, attempt);
                            future.completeExceptionally(ex);
                        }
                    }

                    @Override
                    public void cancelled() {
                        future.cancel(true);
                    }
                });
            }
            catch (Throwable t2) {
                future.completeExceptionally(t2);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            future.completeExceptionally(new CancellationException("Thread was interrupted while waiting for semaphore permit."));
        }
        return future;
    }

    private void handleRetry(String ucVolumePath, String objectPath, int attempt, CompletableFuture<CreateUploadUrlResponse> future, long retryStartTime) {
        long retryDelayMs = this.calculateRetryDelay(attempt);
        long elapsedSeconds = (System.currentTimeMillis() - retryStartTime) / 1000L;
        int timeoutSeconds = VolumeRetryUtil.getRetryTimeoutSeconds(this.connectionContext);
        LOGGER.info("Request for {} failed or was rate-limited. Retrying in {} ms (elapsed: {}s, timeout: {}s)", objectPath, retryDelayMs, elapsedSeconds, timeoutSeconds);
        CompletableFuture.delayedExecutor(retryDelayMs, TimeUnit.MILLISECONDS).execute(() -> this.requestPresignedUrlWithRetry(ucVolumePath, objectPath, attempt + 1, retryStartTime).whenComplete((response, ex) -> {
            if (ex != null) {
                LOGGER.error((Throwable)ex, "Failed to get presigned URL for {} (attempt {})", (Object)objectPath, (Object)(attempt + 1));
                future.completeExceptionally((Throwable)ex);
            } else {
                future.complete((CreateUploadUrlResponse)response);
            }
        }));
    }

    private long calculateRetryDelay(int attempt) {
        long delay = 200L * (1L << attempt);
        delay = Math.min(delay, 10000L);
        return (long)((double)delay * (0.8 + this.random.nextDouble(0.4)));
    }

    public static class UploadRequest {
        public String objectPath;
        public String ucVolumePath;
        public Path file;
        public InputStream inputStream;
        public long contentLength;
        public int originalIndex;
        public String errorMessage;

        public boolean isFile() {
            return this.file != null;
        }
    }
}

