/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processor.util.file.transfer;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.util.file.transfer.FileInfo;
import org.apache.nifi.processor.util.file.transfer.FileTransfer;
import org.apache.nifi.util.StopWatch;

public abstract class GetFileTransfer
extends AbstractProcessor {
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
    public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
    public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
    public static final String FILE_GROUP_ATTRIBUTE = "file.group";
    public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
    public static final String FILE_SIZE_ATTRIBUTE = "file.size";
    public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
    protected static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ");
    private final AtomicLong lastPollTime = new AtomicLong(-1L);
    private final Lock listingLock = new ReentrantLock();
    private final AtomicReference<BlockingQueue<FileInfo>> fileQueueRef = new AtomicReference();
    private final Set<FileInfo> processing = Collections.synchronizedSet(new HashSet());
    private final ReadWriteLock transferLock = new ReentrantReadWriteLock();
    private final Lock sharableTransferLock = this.transferLock.readLock();
    private final Lock mutuallyExclusiveTransferLock = this.transferLock.writeLock();

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected abstract FileTransfer getFileTransfer(ProcessContext var1);

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        this.listingLock.lock();
        try {
            BlockingQueue<FileInfo> fileQueue = this.fileQueueRef.get();
            if (fileQueue != null) {
                fileQueue.clear();
            }
            this.fileQueueRef.set(null);
        }
        finally {
            this.listingLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) {
        long pollingIntervalMillis = context.getProperty(FileTransfer.POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
        long nextPollTime = this.lastPollTime.get() + pollingIntervalMillis;
        BlockingQueue<FileInfo> fileQueue = this.fileQueueRef.get();
        ComponentLog logger = this.getLogger();
        FileTransfer transfer = null;
        if (System.currentTimeMillis() >= nextPollTime && (fileQueue == null || fileQueue.size() < 100) && this.listingLock.tryLock()) {
            try {
                transfer = this.getFileTransfer(context);
                try {
                    this.fetchListing(context, session, transfer);
                    this.lastPollTime.set(System.currentTimeMillis());
                }
                catch (IOException e) {
                    context.yield();
                    try {
                        transfer.close();
                    }
                    catch (IOException e1) {
                        logger.warn("Unable to close connection", (Throwable)e1);
                    }
                    logger.error("Unable to fetch listing from remote server", (Throwable)e);
                    this.listingLock.unlock();
                    return;
                }
            }
            finally {
                this.listingLock.unlock();
            }
        }
        if ((fileQueue = this.fileQueueRef.get()) == null || fileQueue.isEmpty()) {
            context.yield();
            if (transfer != null) {
                try {
                    transfer.close();
                }
                catch (IOException e1) {
                    logger.warn("Unable to close connection", (Throwable)e1);
                }
            }
            return;
        }
        String hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions().getValue();
        boolean deleteOriginal = context.getProperty(FileTransfer.DELETE_ORIGINAL).asBoolean();
        int maxSelects = context.getProperty(FileTransfer.MAX_SELECTS).asInteger();
        if (transfer == null) {
            transfer = this.getFileTransfer(context);
        }
        HashMap<FlowFile, String> flowFilesReceived = new HashMap<FlowFile, String>();
        try {
            for (int i = 0; i < maxSelects && this.isScheduled(); ++i) {
                FileInfo file;
                this.sharableTransferLock.lock();
                try {
                    file = (FileInfo)fileQueue.poll();
                    if (file == null) break;
                    this.processing.add(file);
                }
                finally {
                    this.sharableTransferLock.unlock();
                }
                File relativeFile = new File(file.getFullPathFileName());
                String parentRelativePath = null == relativeFile.getParent() ? "" : relativeFile.getParent();
                String parentRelativePathString = parentRelativePath + "/";
                Path absPath = relativeFile.toPath().toAbsolutePath();
                String absPathString = absPath.getParent().toString() + "/";
                try {
                    FlowFile flowFile = session.create();
                    StopWatch stopWatch = new StopWatch(false);
                    stopWatch.start();
                    flowFile = transfer.getRemoteFile(file.getFullPathFileName(), flowFile, session);
                    stopWatch.stop();
                    long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                    String dataRate = stopWatch.calculateDataRate(flowFile.getSize());
                    Map<String, String> attributes = this.getAttributesFromFile(file);
                    attributes.put(((Object)((Object)this)).getClass().getSimpleName().toLowerCase() + ".remote.source", hostname);
                    attributes.put(CoreAttributes.PATH.key(), parentRelativePathString);
                    attributes.put(CoreAttributes.FILENAME.key(), relativeFile.getName());
                    attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
                    flowFile = session.putAllAttributes(flowFile, attributes);
                    session.getProvenanceReporter().receive(flowFile, transfer.getProtocolName() + "://" + hostname + "/" + file.getFullPathFileName(), millis);
                    session.transfer(flowFile, REL_SUCCESS);
                    logger.info("Successfully retrieved {} from {} in {} milliseconds at a rate of {} and transferred to success", new Object[]{flowFile, hostname, millis, dataRate});
                    flowFilesReceived.put(flowFile, file.getFullPathFileName());
                    continue;
                }
                catch (IOException e) {
                    context.yield();
                    logger.error("Unable to retrieve file {}", new Object[]{file.getFullPathFileName(), e});
                    try {
                        transfer.close();
                    }
                    catch (IOException e1) {
                        logger.warn("Unable to close connection to remote host", (Throwable)e1);
                    }
                    session.rollback();
                    return;
                }
                catch (FlowFileAccessException e) {
                    context.yield();
                    logger.error("Unable to retrieve file {} due to {}", new Object[]{file.getFullPathFileName(), e.getCause(), e});
                    try {
                        transfer.close();
                    }
                    catch (IOException e1) {
                        logger.warn("Unable to close connection to remote host due to {}", (Throwable)e1);
                    }
                    session.rollback();
                    return;
                }
                finally {
                    this.processing.remove(file);
                }
            }
            FileTransfer fileTransfer = transfer;
            session.commitAsync(() -> {
                if (deleteOriginal) {
                    this.deleteRemote(fileTransfer, flowFilesReceived);
                }
                this.closeTransfer(fileTransfer, hostname);
            }, t -> this.closeTransfer(fileTransfer, hostname));
        }
        catch (Throwable t2) {
            this.closeTransfer(transfer, hostname);
        }
    }

    private void deleteRemote(FileTransfer fileTransfer, Map<FlowFile, String> flowFileToRemoteFileMapping) {
        for (Map.Entry<FlowFile, String> entry : flowFileToRemoteFileMapping.entrySet()) {
            FlowFile receivedFlowFile = entry.getKey();
            String remoteFilename = entry.getValue();
            try {
                fileTransfer.deleteFile(receivedFlowFile, null, remoteFilename);
            }
            catch (IOException e) {
                this.getLogger().error("Failed to remove remote file {} due to {}. This file may be duplicated in a subsequent run", new Object[]{remoteFilename, e, e});
            }
        }
    }

    private void closeTransfer(FileTransfer transfer, String hostname) {
        try {
            transfer.close();
        }
        catch (IOException e) {
            this.getLogger().warn("Failed to close connection to {}", new Object[]{hostname, e});
        }
    }

    protected Map<String, String> getAttributesFromFile(FileInfo info) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        if (info != null) {
            attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(info.getLastModifiedTime()).atZone(ZoneId.systemDefault())));
            attributes.put(FILE_PERMISSIONS_ATTRIBUTE, info.getPermissions());
            attributes.put(FILE_OWNER_ATTRIBUTE, info.getOwner());
            attributes.put(FILE_GROUP_ATTRIBUTE, info.getGroup());
        }
        return attributes;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fetchListing(ProcessContext context, ProcessSession session, FileTransfer transfer) throws IOException {
        BlockingQueue<FileInfo> queue = this.fileQueueRef.get();
        if (queue == null) {
            boolean useNaturalOrdering = context.getProperty(FileTransfer.USE_NATURAL_ORDERING).asBoolean();
            queue = useNaturalOrdering ? new PriorityBlockingQueue(25000) : new LinkedBlockingQueue(25000);
            this.fileQueueRef.set(queue);
        }
        StopWatch stopWatch = new StopWatch(true);
        List<FileInfo> listing = transfer.getListing(true);
        long millis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
        int newItems = 0;
        this.mutuallyExclusiveTransferLock.lock();
        try {
            for (FileInfo file : listing) {
                if (queue.contains(file) || this.processing.contains(file)) continue;
                if (!queue.offer(file)) {
                    break;
                }
                ++newItems;
            }
        }
        finally {
            this.mutuallyExclusiveTransferLock.unlock();
        }
        this.getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new", new Object[]{millis, listing.size(), newItems});
    }
}

