/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.snowflake;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.HistoryResponse;
import net.snowflake.ingest.connection.IngestResponseException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.snowflake.PutSnowflakeInternalStage;
import org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService;
import org.apache.nifi.processors.snowflake.StartSnowflakeIngest;

@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@DefaultSettings(penaltyDuration="5 sec")
@ReadsAttributes(value={@ReadsAttribute(attribute="snowflake.staged.file.path", description="Staged file path")})
@Tags(value={"snowflake", "snowpipe", "ingest", "history"})
@CapabilityDescription(value="Waits until a file in a Snowflake stage is ingested. The stage must be created in the Snowflake account beforehand. This processor is usually connected to an upstream StartSnowflakeIngest processor to make sure that the file is ingested.")
@SeeAlso(value={StartSnowflakeIngest.class, PutSnowflakeInternalStage.class})
public class GetSnowflakeIngestStatus
extends AbstractProcessor {
    public static final PropertyDescriptor INGEST_MANAGER_PROVIDER = new PropertyDescriptor.Builder().name("ingest-manager-provider").displayName("Ingest Manager Provider").description("Specifies the Controller Service to use for ingesting Snowflake staged files.").identifiesControllerService(SnowflakeIngestManagerProviderService.class).required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("For FlowFiles of successful ingestion").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("For FlowFiles of failed ingestion").build();
    static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("For FlowFiles whose file is still not ingested. These FlowFiles should be routed back to this processor to try again later").build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(INGEST_MANAGER_PROVIDER);
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_RETRY, REL_FAILURE);

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        HistoryResponse historyResponse;
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        String stagedFilePath = flowFile.getAttribute("snowflake.staged.file.path");
        if (stagedFilePath == null) {
            this.getLogger().error("Missing required attribute [\"{}\"] for FlowFile", new Object[]{"snowflake.staged.file.path"});
            session.transfer(session.penalize(flowFile), REL_FAILURE);
            return;
        }
        SnowflakeIngestManagerProviderService ingestManagerProviderService = (SnowflakeIngestManagerProviderService)context.getProperty(INGEST_MANAGER_PROVIDER).asControllerService(SnowflakeIngestManagerProviderService.class);
        try {
            SimpleIngestManager snowflakeIngestManager = ingestManagerProviderService.getIngestManager();
            historyResponse = snowflakeIngestManager.getHistory(null, null, null);
        }
        catch (IOException | URISyntaxException e) {
            throw new ProcessException("Failed to get Snowflake ingest history for staged file [" + stagedFilePath + "]", (Throwable)e);
        }
        catch (IngestResponseException e) {
            this.getLogger().error("Failed to get Snowflake ingest history for staged file [{}]", new Object[]{stagedFilePath, e});
            session.transfer(session.penalize(flowFile), REL_FAILURE);
            return;
        }
        Optional fileEntry = Optional.ofNullable(historyResponse.files).flatMap(files -> files.stream().filter(entry -> entry.getPath().equals(stagedFilePath) && entry.isComplete() != false).findFirst());
        if (fileEntry.isEmpty()) {
            session.transfer(session.penalize(flowFile), REL_RETRY);
            return;
        }
        if (((HistoryResponse.FileEntry)fileEntry.get()).getErrorsSeen() > 0L) {
            this.getLogger().error("Failed to ingest file [{}] in Snowflake stage via pipe [{}]. Error: {}", new Object[]{stagedFilePath, ingestManagerProviderService.getPipeName(), ((HistoryResponse.FileEntry)fileEntry.get()).getFirstError()});
            session.transfer(session.penalize(flowFile), REL_FAILURE);
            return;
        }
        session.transfer(flowFile, REL_SUCCESS);
    }
}

