/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.azure.storage;

import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
import com.azure.core.credential.TokenRequestContext;
import com.azure.core.http.HttpAuthorization;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.options.BlobUploadFromUrlOptions;
import com.azure.storage.blob.options.BlockBlobCommitBlockListOptions;
import com.azure.storage.blob.options.BlockBlobStageBlockFromUrlOptions;
import com.azure.storage.blob.sas.BlobContainerSasPermission;
import com.azure.storage.blob.sas.BlobServiceSasSignatureValues;
import com.azure.storage.blob.specialized.BlockBlobClient;
import java.text.DecimalFormat;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
import org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage_v12;
import org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage_v12;
import org.apache.nifi.processors.azure.storage.ListAzureBlobStorage_v12;
import org.apache.nifi.processors.azure.storage.PutAzureBlobStorage_v12;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsType;
import reactor.core.publisher.Mono;

@Tags(value={"azure", "microsoft", "cloud", "storage", "blob"})
@SeeAlso(value={ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class})
@CapabilityDescription(value="Copies a blob in Azure Blob Storage from one account/container to another. The processor uses Azure Blob Storage client library v12.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttributes(value={@WritesAttribute(attribute="azure.container", description="The name of the Azure Blob Storage container"), @WritesAttribute(attribute="azure.blobname", description="The name of the blob on Azure Blob Storage"), @WritesAttribute(attribute="azure.primaryUri", description="Primary location of the blob"), @WritesAttribute(attribute="azure.etag", description="ETag of the blob"), @WritesAttribute(attribute="azure.blobtype", description="Type of the blob (either BlockBlob, PageBlob or AppendBlob)"), @WritesAttribute(attribute="mime.type", description="MIME Type of the content"), @WritesAttribute(attribute="lang", description="Language code for the content"), @WritesAttribute(attribute="azure.timestamp", description="Timestamp of the blob"), @WritesAttribute(attribute="azure.length", description="Length of the blob"), @WritesAttribute(attribute="azure.error.code", description="Error code reported during blob operation"), @WritesAttribute(attribute="azure.ignored", description="When Conflict Resolution Strategy is 'ignore', this property will be true/false depending on whether the blob was ignored.")})
public class CopyAzureBlobStorage_v12
extends AbstractAzureBlobProcessor_v12 {
    private static final int GENERATE_SAS_EXPIRY_HOURS = 24;
    public static final PropertyDescriptor SOURCE_STORAGE_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder().name("Source Storage Credentials").displayName("Source Storage Credentials").description("Credentials Service used to obtain Azure Blob Storage Credentials to read Source Blob information").identifiesControllerService(AzureStorageCredentialsService_v12.class).required(true).build();
    public static final PropertyDescriptor SOURCE_CONTAINER_NAME = new PropertyDescriptor.Builder().name("Source Container Name").displayName("Source Container Name").description("Name of the Azure storage container that will be copied").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).build();
    public static final PropertyDescriptor SOURCE_BLOB_NAME = new PropertyDescriptor.Builder().name("Source Blob Name").displayName("Source Blob Name").description("Name of the Azure blob that will be copied").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).description("The full name of the source blob").build();
    public static final PropertyDescriptor DESTINATION_STORAGE_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder().fromPropertyDescriptor(AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE).displayName("Destination Storage Credentials").build();
    public static final PropertyDescriptor DESTINATION_CONTAINER_NAME = new PropertyDescriptor.Builder().fromPropertyDescriptor(AzureStorageUtils.CONTAINER).displayName("Destination Container Name").description("Name of the Azure storage container destination defaults to the Source Container Name when not specified").required(false).build();
    public static final PropertyDescriptor DESTINATION_BLOB_NAME = new PropertyDescriptor.Builder().fromPropertyDescriptor(BLOB_NAME).displayName("Destination Blob Name").description("The full name of the destination blob defaults to the Source Blob Name when not specified").required(false).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(SOURCE_STORAGE_CREDENTIALS_SERVICE, SOURCE_CONTAINER_NAME, SOURCE_BLOB_NAME, DESTINATION_STORAGE_CREDENTIALS_SERVICE, DESTINATION_CONTAINER_NAME, DESTINATION_BLOB_NAME, AzureStorageUtils.CONFLICT_RESOLUTION, AzureStorageUtils.CREATE_CONTAINER, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE);

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        String sourceContainerName = context.getProperty(SOURCE_CONTAINER_NAME).evaluateAttributeExpressions(flowFile).getValue();
        String sourceBlobName = context.getProperty(SOURCE_BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue();
        String destinationContainerName = Optional.ofNullable(context.getProperty(DESTINATION_CONTAINER_NAME).evaluateAttributeExpressions(flowFile).getValue()).orElse(sourceContainerName);
        String destinationBlobName = Optional.ofNullable(context.getProperty(DESTINATION_BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue()).orElse(sourceBlobName);
        boolean createContainer = context.getProperty(AzureStorageUtils.CREATE_CONTAINER).asBoolean();
        AzureStorageConflictResolutionStrategy conflictResolution = (AzureStorageConflictResolutionStrategy)context.getProperty(AzureStorageUtils.CONFLICT_RESOLUTION).asAllowableValue(AzureStorageConflictResolutionStrategy.class);
        long startNanos = System.nanoTime();
        try {
            BlobServiceClient destinationServiceClient = this.getStorageClient((PropertyContext)context, DESTINATION_STORAGE_CREDENTIALS_SERVICE, flowFile);
            BlobContainerClient destinationContainerClient = destinationServiceClient.getBlobContainerClient(destinationContainerName);
            if (createContainer && !destinationContainerClient.exists()) {
                destinationContainerClient.create();
            }
            BlobClient destinationBlobClient = destinationContainerClient.getBlobClient(destinationBlobName);
            LinkedHashMap<String, String> attributes = new LinkedHashMap<String, String>();
            this.applyStandardBlobAttributes(attributes, destinationBlobClient);
            boolean ignoreStrategyEnabled = conflictResolution == AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
            BlobRequestConditions destinationRequestConditions = new BlobRequestConditions();
            try {
                HttpAuthorization httpAuthorization;
                String sasToken;
                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
                    destinationRequestConditions.setIfNoneMatch("*");
                }
                AzureStorageCredentialsService_v12 sourceCredentialsService = CopyAzureBlobStorage_v12.getCopyFromCredentialsService(context);
                BlobServiceClient sourceServiceClient = this.getStorageClient((PropertyContext)context, SOURCE_STORAGE_CREDENTIALS_SERVICE, flowFile);
                BlobContainerClient sourceContainerClient = sourceServiceClient.getBlobContainerClient(sourceContainerName);
                BlobClient sourceBlobClient = sourceContainerClient.getBlobClient(sourceBlobName);
                AzureStorageCredentialsDetails_v12 sourceCredentialsDetails = sourceCredentialsService.getCredentialsDetails(flowFile.getAttributes());
                Object sourceUrl = sourceBlobClient.getBlobUrl();
                BlobProperties sourceBlobProperties = sourceBlobClient.getProperties();
                long blobSize = sourceBlobProperties.getBlobSize();
                BlobRequestConditions sourceRequestConditions = new BlobRequestConditions();
                sourceRequestConditions.setIfMatch(sourceBlobProperties.getETag());
                String string = sasToken = sourceCredentialsDetails.getCredentialsType() == AzureStorageCredentialsType.ACCOUNT_KEY ? CopyAzureBlobStorage_v12.generateSas(sourceContainerClient) : sourceCredentialsDetails.getSasToken();
                if (sasToken == null) {
                    httpAuthorization = CopyAzureBlobStorage_v12.getHttpAuthorization(sourceCredentialsDetails);
                } else {
                    sourceUrl = (String)sourceUrl + "?" + sasToken;
                    httpAuthorization = null;
                }
                this.copy(destinationBlobClient, httpAuthorization, (String)sourceUrl, blobSize, sourceRequestConditions, destinationRequestConditions);
                this.applyBlobMetadata(attributes, destinationBlobClient);
                if (ignoreStrategyEnabled) {
                    attributes.put("azure.ignored", Boolean.FALSE.toString());
                }
            }
            catch (BlobStorageException e) {
                BlobErrorCode errorCode = e.getErrorCode();
                flowFile = session.putAttribute(flowFile, "azure.error.code", e.getErrorCode().toString());
                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS && ignoreStrategyEnabled) {
                    this.getLogger().info("Blob already exists: remote blob not modified. Transferring {} to success", new Object[]{flowFile});
                    attributes.put("azure.ignored", Boolean.TRUE.toString());
                }
                throw e;
            }
            flowFile = session.putAllAttributes(flowFile, attributes);
            session.transfer(flowFile, REL_SUCCESS);
            long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
            String transitUri = (String)attributes.get("azure.primaryUri");
            session.getProvenanceReporter().send(flowFile, transitUri, transferMillis);
        }
        catch (Exception e) {
            this.getLogger().error("Failed to create blob on Azure Blob Storage", (Throwable)e);
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    private void copy(BlobClient destinationBlobClient, HttpAuthorization httpAuthorization, String sourceUrl, long blobSize, BlobRequestConditions sourceRequestConditions, BlobRequestConditions destinationRequestConditions) {
        long count;
        BlockBlobClient blockBlobClient = destinationBlobClient.getBlockBlobClient();
        if (blobSize < 5242880000L) {
            BlobUploadFromUrlOptions options = new BlobUploadFromUrlOptions(sourceUrl);
            if (httpAuthorization != null) {
                options.setSourceAuthorization(httpAuthorization);
            }
            options.setSourceRequestConditions(sourceRequestConditions);
            options.setDestinationRequestConditions(destinationRequestConditions);
            blockBlobClient.uploadFromUrlWithResponse(options, null, Context.NONE);
            return;
        }
        DecimalFormat df = new DecimalFormat("0000000");
        long offset = 0L;
        int blockId = 1;
        ArrayList<String> blockIds = new ArrayList<String>();
        while ((count = Math.min(blobSize - offset, 0xFA000000L)) != 0L) {
            String zeroPadded = df.format(blockId);
            String base64BlockId = Base64.getEncoder().encodeToString(zeroPadded.getBytes());
            BlockBlobStageBlockFromUrlOptions blockBlobStageBlockFromUrlOptions = new BlockBlobStageBlockFromUrlOptions(base64BlockId, sourceUrl);
            blockBlobStageBlockFromUrlOptions.setSourceRange(new BlobRange(offset, Long.valueOf(count)));
            if (httpAuthorization != null) {
                blockBlobStageBlockFromUrlOptions.setSourceAuthorization(httpAuthorization);
            }
            blockBlobStageBlockFromUrlOptions.setSourceRequestConditions(sourceRequestConditions);
            int statusCode = blockBlobClient.stageBlockFromUrlWithResponse(blockBlobStageBlockFromUrlOptions, null, Context.NONE).getStatusCode();
            if (statusCode != 202) {
                throw new ProcessException(String.format("Failed staging one or more blocks: HTTP %d", statusCode));
            }
            blockIds.add(base64BlockId);
            offset += count;
            ++blockId;
        }
        BlockBlobCommitBlockListOptions options = new BlockBlobCommitBlockListOptions(blockIds);
        options.setRequestConditions(destinationRequestConditions);
        Response response = blockBlobClient.commitBlockListWithResponse(options, null, Context.NONE);
        int statusCode = response.getStatusCode();
        if (statusCode != 202) {
            throw new ProcessException(String.format("Failed committing block list: HTTP %d", statusCode));
        }
    }

    private static String generateSas(BlobContainerClient sourceContainerClient) {
        BlobContainerSasPermission permissions = new BlobContainerSasPermission().setCreatePermission(true).setWritePermission(true).setAddPermission(true).setReadPermission(true);
        OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC);
        OffsetDateTime expiryTime = now.plusHours(24L);
        BlobServiceSasSignatureValues signatureValues = new BlobServiceSasSignatureValues(expiryTime, permissions);
        return sourceContainerClient.generateSas(signatureValues);
    }

    private static AzureStorageCredentialsService_v12 getCopyFromCredentialsService(ProcessContext context) {
        return (AzureStorageCredentialsService_v12)context.getProperty(SOURCE_STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
    }

    private static HttpAuthorization getHttpAuthorization(AzureStorageCredentialsDetails_v12 credentialsDetails) {
        switch (credentialsDetails.getCredentialsType()) {
            case ACCESS_TOKEN: {
                TokenCredential credential = tokenRequestContext -> Mono.just((Object)credentialsDetails.getAccessToken());
                return CopyAzureBlobStorage_v12.getHttpAuthorizationFromTokenCredential(credential);
            }
            case MANAGED_IDENTITY: {
                ManagedIdentityCredential credential = new ManagedIdentityCredentialBuilder().clientId(credentialsDetails.getManagedIdentityClientId()).build();
                return CopyAzureBlobStorage_v12.getHttpAuthorizationFromTokenCredential((TokenCredential)credential);
            }
            case SERVICE_PRINCIPAL: {
                ClientSecretCredential credential = ((ClientSecretCredentialBuilder)((ClientSecretCredentialBuilder)new ClientSecretCredentialBuilder().clientId(credentialsDetails.getServicePrincipalClientId())).clientSecret(credentialsDetails.getServicePrincipalClientSecret()).tenantId(credentialsDetails.getServicePrincipalTenantId())).build();
                return CopyAzureBlobStorage_v12.getHttpAuthorizationFromTokenCredential((TokenCredential)credential);
            }
        }
        return null;
    }

    private static HttpAuthorization getHttpAuthorizationFromTokenCredential(TokenCredential credential) {
        TokenRequestContext tokenRequestContext = new TokenRequestContext();
        tokenRequestContext.setScopes(Collections.singletonList("https://storage.azure.com/.default"));
        AccessToken accessToken = (AccessToken)credential.getToken(tokenRequestContext).block();
        if (accessToken == null) {
            throw new IllegalStateException("Storage Access Token not retrieved");
        }
        String token = accessToken.getToken();
        return new HttpAuthorization("Bearer", token);
    }
}

