/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.repository;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.BackoffMechanism;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.lifecycle.TaskTermination;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.ContentNotFoundException;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.ProvenanceEventEnricher;
import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.RepositoryRecord;
import org.apache.nifi.controller.repository.RepositoryRecordType;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.TransientClaimRepositoryRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.io.ContentClaimInputStream;
import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream;
import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream;
import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream;
import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.controller.repository.io.TaskTerminationInputStream;
import org.apache.nifi.controller.repository.io.TaskTerminationOutputStream;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.controller.repository.metrics.PerformanceTrackingInputStream;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.controller.state.StandardStateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.FlowFileHandlingException;
import org.apache.nifi.processor.exception.MissingFlowFileException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.provenance.InternalProvenanceReporter;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.NonFlushableOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardProcessSession
implements ProcessSession,
ProvenanceEventEnricher {
    private static final Set<String> REQUIRED_ATTRIBUTES = Set.of(CoreAttributes.UUID.key(), CoreAttributes.FILENAME.key(), CoreAttributes.PATH.key());
    private static final long VERSION_INCREMENT = 1L;
    private static final String INITIAL_VERSION = String.valueOf(1L);
    private static final AtomicLong idGenerator = new AtomicLong(0L);
    private static final AtomicLong enqueuedIndex = new AtomicLong(0L);
    private static final StateMap EMPTY_STATE_MAP = new StandardStateMap(Collections.emptyMap(), Optional.empty());
    public static final int VERBOSE_LOG_THRESHOLD = 10;
    public static final String DEFAULT_FLOWFILE_PATH = "./";
    private static final Logger LOG = LoggerFactory.getLogger(StandardProcessSession.class);
    private static final Logger claimLog = LoggerFactory.getLogger((String)(StandardProcessSession.class.getSimpleName() + ".claims"));
    private static final int MAX_ROLLBACK_FLOWFILES_TO_LOG = 5;
    private final Map<Long, StandardRepositoryRecord> records = new ConcurrentHashMap<Long, StandardRepositoryRecord>();
    private final Map<String, StandardFlowFileEvent> connectionCounts = new ConcurrentHashMap<String, StandardFlowFileEvent>();
    private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new ConcurrentHashMap<FlowFileQueue, Set<FlowFileRecord>>();
    private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new ConcurrentHashMap<ContentClaim, ByteCountingOutputStream>();
    private final RepositoryContext context;
    private final TaskTermination taskTermination;
    private final Map<FlowFile, Integer> readRecursionSet = new HashMap<FlowFile, Integer>();
    private final Set<FlowFile> writeRecursionSet = new HashSet<FlowFile>();
    private final Map<FlowFile, Path> deleteOnCommit = new HashMap<FlowFile, Path>();
    private final long sessionId;
    private final String connectableDescription;
    private final PerformanceTracker performanceTracker;
    private Map<String, Long> countersOnCommit;
    private Map<String, Long> immediateCounters;
    private final Set<String> removedFlowFiles = new HashSet<String>();
    private final Set<String> createdFlowFiles = new HashSet<String>();
    private final Set<String> createdFlowFilesWithoutLineage = new HashSet<String>();
    private final InternalProvenanceReporter provenanceReporter;
    private int removedCount = 0;
    private long removedBytes = 0L;
    private long bytesRead = 0L;
    private long bytesWritten = 0L;
    private int flowFilesIn = 0;
    private int flowFilesOut = 0;
    private long contentSizeIn = 0L;
    private long contentSizeOut = 0L;
    private ResourceClaim currentReadClaim = null;
    private ByteCountingInputStream currentReadClaimStream = null;
    private long processingStartTime;
    private final Map<FlowFile, InputStream> openInputStreams = new ConcurrentHashMap<FlowFile, InputStream>();
    private final Map<FlowFile, OutputStream> openOutputStreams = new ConcurrentHashMap<FlowFile, OutputStream>();
    private final Map<FlowFile, List<ProvenanceEventRecord>> generatedProvenanceEvents = new HashMap<FlowFile, List<ProvenanceEventRecord>>();
    private final Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders = new HashMap<FlowFile, ProvenanceEventBuilder>();
    private Checkpoint checkpoint = null;
    private final ContentClaimWriteCache claimCache;
    private StateMap localState;
    private StateMap clusterState;
    private final String retryAttribute;
    private final FlowFileLinkage flowFileLinkage = new FlowFileLinkage();

    public StandardProcessSession(RepositoryContext context, TaskTermination taskTermination, PerformanceTracker performanceTracker) {
        this.context = context;
        this.taskTermination = taskTermination;
        this.performanceTracker = performanceTracker;
        this.provenanceReporter = context.createProvenanceReporter(this::isFlowFileKnown, this);
        this.sessionId = idGenerator.getAndIncrement();
        this.connectableDescription = context.getConnectableDescription();
        this.claimCache = context.createContentClaimWriteCache(performanceTracker);
        LOG.trace("Session {} created for {}", (Object)this, (Object)this.connectableDescription);
        this.processingStartTime = System.nanoTime();
        this.retryAttribute = "retryCount." + context.getConnectable().getIdentifier();
    }

    private void verifyTaskActive() {
        if (this.taskTermination.isTerminated()) {
            this.rollback(false, true);
            throw new TerminatedTaskException();
        }
    }

    protected RepositoryContext getRepositoryContext() {
        return this.context;
    }

    protected long getSessionId() {
        return this.sessionId;
    }

    private void closeStreams(Map<FlowFile, ? extends Closeable> streamMap, String action, String streamType) {
        if (streamMap.isEmpty()) {
            return;
        }
        HashMap<FlowFile, ? extends Closeable> openStreamCopy = new HashMap<FlowFile, Closeable>(streamMap);
        for (Map.Entry entry : openStreamCopy.entrySet()) {
            FlowFile flowFile = (FlowFile)entry.getKey();
            Closeable openStream = (Closeable)entry.getValue();
            LOG.warn("{} closing {} for {} because the session was {} without the {} stream being closed.", new Object[]{this, openStream, flowFile, action, streamType});
            try {
                openStream.close();
            }
            catch (Exception e) {
                LOG.warn("{} Attempted to close {} for {} due to session commit but close failed", new Object[]{this, openStream, this.connectableDescription});
                LOG.warn("", (Throwable)e);
            }
        }
    }

    public void checkpoint() {
        this.checkpoint(true);
        this.resetState();
    }

    private void validateCommitState() {
        this.verifyTaskActive();
        if (!this.readRecursionSet.isEmpty()) {
            throw new IllegalStateException("Cannot commit session while reading from FlowFile");
        }
        if (!this.writeRecursionSet.isEmpty()) {
            throw new IllegalStateException("Cannot commit session while writing to FlowFile");
        }
        for (StandardRepositoryRecord record : this.records.values()) {
            if (record.isMarkedForDelete()) continue;
            Relationship relationship = record.getTransferRelationship();
            if (relationship == null) {
                String createdThisSession = record.getOriginalQueue() == null ? "was created" : "was not created";
                throw new FlowFileHandlingException(String.valueOf(record.getCurrent()) + " transfer relationship not specified. This FlowFile " + createdThisSession + " in this session and was not transferred to any Relationship via ProcessSession.transfer()");
            }
            Collection<Connection> destinations = this.context.getConnections(relationship);
            if (!destinations.isEmpty() || this.context.getConnectable().isAutoTerminated(relationship) || relationship == Relationship.SELF) continue;
            throw new FlowFileHandlingException(String.valueOf(relationship) + " does not have any destinations for " + String.valueOf(this.context.getConnectable()));
        }
    }

    private void checkpoint(boolean copyCollections) {
        try {
            this.validateCommitState();
        }
        catch (Exception e) {
            this.rollback();
            throw e;
        }
        this.resetWriteClaims(false);
        this.closeStreams(this.openInputStreams, "committed", "input");
        this.closeStreams(this.openOutputStreams, "committed", "output");
        if (this.checkpoint == null) {
            this.checkpoint = new Checkpoint();
        }
        if (this.records.isEmpty() && (this.countersOnCommit == null || this.countersOnCommit.isEmpty())) {
            LOG.trace("{} checkpointed, but no events were performed by this ProcessSession", (Object)this);
            this.checkpoint.checkpoint(this, Collections.emptyList(), copyCollections);
            return;
        }
        ArrayList<ProvenanceEventRecord> autoTerminatedEvents = null;
        HashMap<Long, StandardRepositoryRecord> toAdd = new HashMap<Long, StandardRepositoryRecord>();
        Connectable connectable = this.context.getConnectable();
        long maxBackoffMillis = Math.round(FormatUtils.getPreciseTimeDuration((String)connectable.getMaxBackoffPeriod(), (TimeUnit)TimeUnit.MILLISECONDS));
        HashSet<Long> retryIds = new HashSet<Long>();
        for (StandardRepositoryRecord record : this.records.values()) {
            if (!this.isRetry(record)) continue;
            long flowFileId = record.getCurrent().getId();
            retryIds.add(flowFileId);
            Collection<Long> linkedIds = this.flowFileLinkage.getLinkedIds(flowFileId);
            retryIds.addAll(linkedIds);
        }
        for (StandardRepositoryRecord record : this.records.values()) {
            if (retryIds.contains(record.getCurrent().getId())) {
                this.retry(record, maxBackoffMillis);
            }
            if (record.isMarkedForDelete()) continue;
            Relationship relationship = record.getTransferRelationship();
            ArrayList<Connection> destinations = new ArrayList<Connection>(this.context.getConnections(relationship));
            if (destinations.isEmpty() && relationship == Relationship.SELF) {
                record.setDestination(record.getOriginalQueue());
                continue;
            }
            if (destinations.isEmpty()) {
                record.markForDelete();
                if (autoTerminatedEvents == null) {
                    autoTerminatedEvents = new ArrayList<ProvenanceEventRecord>();
                }
                try {
                    ProvenanceEventRecord dropEvent = this.provenanceReporter.generateDropEvent((FlowFile)record.getCurrent(), "Auto-Terminated by " + relationship.getName() + " Relationship");
                    autoTerminatedEvents.add(dropEvent);
                }
                catch (Exception e) {
                    LOG.warn("Unable to generate Provenance Event for {} on behalf of {}", new Object[]{record.getCurrent(), this.connectableDescription, e});
                }
                continue;
            }
            FlowFileRecord currRec = record.getCurrent();
            if (currRec.getAttribute(this.retryAttribute) != null) {
                currRec = new StandardFlowFileRecord.Builder().fromFlowFile(currRec).removeAttributes(new String[]{this.retryAttribute}).build();
                record.setWorking(currRec, this.retryAttribute, null, false);
            }
            Connection finalDestination = (Connection)destinations.remove(destinations.size() - 1);
            record.setDestination(finalDestination.getFlowFileQueue());
            this.incrementConnectionInputCounts(finalDestination, (RepositoryRecord)record);
            for (Connection destination : destinations) {
                this.incrementConnectionInputCounts(destination, (RepositoryRecord)record);
                StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
                builder.id(this.context.getNextFlowFileSequence());
                String newUuid = UUID.randomUUID().toString();
                builder.addAttribute(CoreAttributes.UUID.key(), newUuid);
                FlowFileRecord clone = builder.build();
                StandardRepositoryRecord newRecord = new StandardRepositoryRecord(destination.getFlowFileQueue());
                this.provenanceReporter.clone((FlowFile)currRec, (FlowFile)clone, false);
                ContentClaim claim = clone.getContentClaim();
                if (claim != null) {
                    this.context.getContentRepository().incrementClaimaintCount(claim);
                }
                newRecord.setWorking(clone, Collections.emptyMap(), false);
                newRecord.setDestination(destination.getFlowFileQueue());
                newRecord.setTransferRelationship(record.getTransferRelationship());
                toAdd.put(clone.getId(), newRecord);
                this.createdFlowFiles.add(newUuid);
            }
        }
        this.records.putAll(toAdd);
        toAdd.clear();
        this.checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
    }

    private boolean isRetry(StandardRepositoryRecord record) {
        Relationship relationship = record.getTransferRelationship();
        if (relationship == null) {
            return false;
        }
        Connectable connectable = this.context.getConnectable();
        if (!connectable.isRelationshipRetried(relationship)) {
            return false;
        }
        String uuid = record.getCurrent().getAttribute(CoreAttributes.UUID.key());
        if (this.createdFlowFilesWithoutLineage.contains(uuid)) {
            return false;
        }
        int retryCount = this.getRetries((FlowFile)record.getCurrent());
        return retryCount < connectable.getRetryCount();
    }

    private void retry(StandardRepositoryRecord record, long maxBackoffMillis) {
        FlowFileRecord original;
        LOG.debug("Updating state to retry {}", (Object)record.getCurrent());
        Connectable connectable = this.context.getConnectable();
        int currentRetries = this.getRetries((FlowFile)record.getCurrent());
        Relationship relationship = record.getTransferRelationship();
        if (relationship != null) {
            int numDestinations = this.context.getConnections(relationship).size();
            int multiplier = Math.max(1, numDestinations);
            boolean autoTerminated = connectable.isAutoTerminated(relationship);
            if (!autoTerminated) {
                this.flowFilesOut -= multiplier;
                this.contentSizeOut -= record.getCurrent().getSize() * (long)multiplier;
            }
        }
        if ((original = record.getOriginal()) != null) {
            --this.flowFilesIn;
            this.contentSizeIn -= original.getSize();
        }
        this.removeTemporaryClaim(record);
        String uuid = record.getCurrent().getAttribute(CoreAttributes.UUID.key());
        FlowFileRecord updatedFlowFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getOriginal()).addAttribute(this.retryAttribute, String.valueOf(currentRetries + 1)).build();
        if (original == null) {
            record.markForDelete();
        } else {
            record.setTransferRelationship(Relationship.SELF);
        }
        record.setWorking(updatedFlowFile, false);
        this.provenanceReporter.removeEventsForFlowFile(uuid);
        this.forkEventBuilders.remove(record.getCurrent());
        this.createdFlowFiles.remove(uuid);
        this.createdFlowFilesWithoutLineage.remove(uuid);
        this.removedFlowFiles.remove(uuid);
        BackoffMechanism backoffMechanism = connectable.getBackoffMechanism();
        if (backoffMechanism == BackoffMechanism.PENALIZE_FLOWFILE) {
            if (!record.isMarkedForDelete()) {
                long backoffTime = this.calculateBackoffTime(currentRetries, maxBackoffMillis, connectable.getPenalizationPeriod(TimeUnit.MILLISECONDS));
                this.penalize((FlowFile)record.getCurrent(), backoffTime, TimeUnit.MILLISECONDS);
            }
        } else {
            long backoffTime = this.calculateBackoffTime(currentRetries, maxBackoffMillis, connectable.getYieldPeriod(TimeUnit.MILLISECONDS));
            connectable.yield(backoffTime, TimeUnit.MILLISECONDS);
        }
    }

    private int getRetries(FlowFile flowFile) {
        if (flowFile == null) {
            return 0;
        }
        String attributeValue = flowFile.getAttribute(this.retryAttribute);
        if (attributeValue == null) {
            return 0;
        }
        try {
            return Integer.parseInt(attributeValue);
        }
        catch (Exception e) {
            return 0;
        }
    }

    private long calculateBackoffTime(int retryCount, long maxBackoffPeriod, long baseBackoffTime) {
        return (long)Math.min((double)maxBackoffPeriod, Math.pow(2.0, retryCount) * (double)baseBackoffTime);
    }

    public synchronized void commit() {
        this.commit(false);
    }

    public void commitAsync() {
        try {
            this.commit(true);
        }
        catch (Throwable t) {
            try {
                this.rollback();
            }
            catch (Throwable t2) {
                t.addSuppressed(t2);
            }
            throw t;
        }
    }

    public void commitAsync(Runnable onSuccess, Consumer<Throwable> onFailure) {
        try {
            this.commit(true);
        }
        catch (Throwable t) {
            LOG.error("Failed to asynchronously commit session {} for {}", new Object[]{this, this.connectableDescription, t});
            try {
                this.rollback();
            }
            catch (Throwable t2) {
                LOG.error("Failed to roll back session {} for {}", new Object[]{this, this.connectableDescription, t2});
            }
            if (onFailure != null) {
                onFailure.accept(t);
            }
            throw t;
        }
        if (onSuccess != null) {
            try {
                onSuccess.run();
            }
            catch (Exception e) {
                LOG.error("Successfully committed session {} for {} but failed to trigger success callback", new Object[]{this, this.connectableDescription, e});
            }
        }
        LOG.debug("Successfully committed session {} for {}", (Object)this, (Object)this.connectableDescription);
    }

    private synchronized void commit(boolean asynchronous) {
        this.checkpoint(this.checkpoint != null);
        this.commit(this.checkpoint, asynchronous);
        this.checkpoint = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void commit(Checkpoint checkpoint, boolean asynchronous) {
        try {
            Optional stateVersion;
            Object sessionSummary;
            this.performanceTracker.beginSessionCommit();
            long commitStartNanos = System.nanoTime();
            this.resetReadClaim();
            try {
                this.claimCache.flush();
            }
            finally {
                this.claimCache.reset();
            }
            long updateProvenanceStart = System.nanoTime();
            this.updateProvenanceRepo(checkpoint);
            long flowFileRepoUpdateStart = System.nanoTime();
            long updateProvenanceNanos = flowFileRepoUpdateStart - updateProvenanceStart;
            try {
                Collection<StandardRepositoryRecord> repoRecords = checkpoint.records.values();
                this.context.getFlowFileRepository().updateRepository(repoRecords);
            }
            catch (IOException ioe) {
                this.rollback(false, true);
                throw new ProcessException("FlowFile Repository failed to update", (Throwable)ioe);
            }
            long flowFileRepoUpdateFinishNanos = System.nanoTime();
            long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - flowFileRepoUpdateStart;
            if (LOG.isDebugEnabled()) {
                for (RepositoryRecord repositoryRecord : checkpoint.records.values()) {
                    if (!repositoryRecord.isMarkedForAbort()) continue;
                    FlowFileRecord flowFile = repositoryRecord.getCurrent();
                    long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
                    Connectable connectable = this.context.getConnectable();
                    Connectable connectable2 = connectable instanceof ProcessorNode ? ((ProcessorNode)connectable).getProcessor() : connectable;
                    LOG.debug("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, connectable2, flowFileLife});
                }
            }
            this.updateEventRepository(checkpoint);
            long updateEventRepositoryFinishNanos = System.nanoTime();
            long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - flowFileRepoUpdateFinishNanos;
            HashMap<FlowFileQueue, ArrayList<FlowFileRecord>> recordMap = new HashMap<FlowFileQueue, ArrayList<FlowFileRecord>>();
            for (StandardRepositoryRecord standardRepositoryRecord : checkpoint.records.values()) {
                if (standardRepositoryRecord.isMarkedForAbort() || standardRepositoryRecord.isMarkedForDelete() || standardRepositoryRecord.getCurrent() == null) continue;
                ArrayList<FlowFileRecord> collection = (ArrayList<FlowFileRecord>)recordMap.get(standardRepositoryRecord.getDestination());
                if (collection == null) {
                    collection = new ArrayList<FlowFileRecord>();
                    recordMap.put(standardRepositoryRecord.getDestination(), collection);
                }
                collection.add(standardRepositoryRecord.getCurrent());
            }
            for (Map.Entry entry : recordMap.entrySet()) {
                ((FlowFileQueue)entry.getKey()).putAll((Collection)entry.getValue());
            }
            long enqueueFlowFileFinishNanos = System.nanoTime();
            long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - updateEventRepositoryFinishNanos;
            for (Path path : checkpoint.deleteOnCommit.values()) {
                try {
                    Files.deleteIfExists(path);
                }
                catch (IOException e) {
                    throw new FlowFileAccessException("Unable to delete " + path.toFile().getAbsolutePath(), (Throwable)e);
                }
            }
            checkpoint.deleteOnCommit.clear();
            if (LOG.isDebugEnabled() && !((String)(sessionSummary = this.summarizeEvents(checkpoint))).isEmpty()) {
                LOG.debug("{} for {}, committed the following events: {}", new Object[]{this, this.connectableDescription, sessionSummary});
            }
            for (Map.Entry entry : checkpoint.countersOnCommit.entrySet()) {
                this.context.adjustCounter((String)entry.getKey(), (Long)entry.getValue());
            }
            if (LOG.isDebugEnabled()) {
                StringBuilder timingInfo = new StringBuilder();
                timingInfo.append("Session commit for ").append(this).append(" [").append(this.connectableDescription).append("]").append(" took ");
                long l = System.nanoTime() - commitStartNanos;
                this.formatNanos(l, timingInfo);
                timingInfo.append("; FlowFile Repository Update took ");
                this.formatNanos(flowFileRepoUpdateNanos, timingInfo);
                timingInfo.append("; FlowFile Event Update took ");
                this.formatNanos(updateEventRepositoryNanos, timingInfo);
                timingInfo.append("; Enqueuing FlowFiles took ");
                this.formatNanos(enqueueFlowFileNanos, timingInfo);
                timingInfo.append("; Updating Provenance Event Repository took ");
                this.formatNanos(updateProvenanceNanos, timingInfo);
                LOG.debug("{}", (Object)timingInfo);
            }
            StateManager stateManager = this.context.getStateManager();
            if (checkpoint.localState != null) {
                try {
                    StateMap stateMap = stateManager.getState(Scope.LOCAL);
                    stateVersion = stateMap.getStateVersion();
                    if (!stateVersion.equals(checkpoint.localState.getStateVersion())) {
                        LOG.debug("Updating State Manager's Local State");
                        stateManager.setState(checkpoint.localState.toMap(), Scope.LOCAL);
                    } else {
                        LOG.debug("Will not update State Manager's Local State because the State Manager reports the latest version as {}, which is newer than the session's known version of {}.", (Object)stateVersion, (Object)checkpoint.localState.getStateVersion());
                    }
                }
                catch (Exception exception) {
                    LOG.warn("Failed to update Local State for {}. If NiFi is restarted before the state is able to be updated, it could result in data duplication.", (Object)this.connectableDescription, (Object)exception);
                }
            }
            if (checkpoint.clusterState != null) {
                try {
                    StateMap stateMap = stateManager.getState(Scope.CLUSTER);
                    stateVersion = stateMap.getStateVersion();
                    if (!stateVersion.equals(checkpoint.clusterState.getStateVersion())) {
                        LOG.debug("Updating State Manager's Cluster State");
                        stateManager.setState(checkpoint.clusterState.toMap(), Scope.CLUSTER);
                    } else {
                        LOG.debug("Will not update State Manager's Cluster State because the State Manager reports the latest version as {}, which is newer than the session's known version of {}.", (Object)stateVersion, (Object)checkpoint.clusterState.getStateVersion());
                    }
                }
                catch (Exception exception) {
                    LOG.warn("Failed to update Cluster State for {}. If NiFi is restarted before the state is able to be updated, it could result in data duplication.", (Object)this.connectableDescription, (Object)exception);
                }
            }
            this.acknowledgeRecords();
            this.resetState();
        }
        catch (Exception e) {
            LOG.error("Failed to commit session {}. Will roll back.", (Object)this, (Object)e);
            try {
                this.rollback(false, true);
            }
            catch (Exception e1) {
                e.addSuppressed(e1);
            }
            if (e instanceof RuntimeException) {
                throw (RuntimeException)e;
            }
            throw new ProcessException((Throwable)e);
        }
        finally {
            this.performanceTracker.endSessionCommit();
        }
    }

    private void updateEventRepository(Checkpoint checkpoint) {
        try {
            Connectable connectable = this.context.getConnectable();
            StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent();
            flowFileEvent.setBytesRead(checkpoint.bytesRead);
            flowFileEvent.setBytesWritten(checkpoint.bytesWritten);
            flowFileEvent.setContentSizeIn(checkpoint.contentSizeIn);
            flowFileEvent.setContentSizeOut(checkpoint.contentSizeOut);
            flowFileEvent.setContentSizeRemoved(checkpoint.removedBytes);
            flowFileEvent.setFlowFilesIn(checkpoint.flowFilesIn);
            flowFileEvent.setFlowFilesOut(checkpoint.flowFilesOut);
            flowFileEvent.setFlowFilesRemoved(checkpoint.removedCount);
            flowFileEvent.setFlowFilesReceived(checkpoint.flowFilesReceived);
            flowFileEvent.setBytesReceived(checkpoint.bytesReceived);
            flowFileEvent.setFlowFilesSent(checkpoint.flowFilesSent);
            flowFileEvent.setBytesSent(checkpoint.bytesSent);
            long now = System.currentTimeMillis();
            long lineageMillis = 0L;
            for (StandardRepositoryRecord record : checkpoint.records.values()) {
                FlowFileRecord flowFile = record.getCurrent();
                long lineageDuration = now - flowFile.getLineageStartDate();
                lineageMillis += lineageDuration;
            }
            flowFileEvent.setAggregateLineageMillis(lineageMillis);
            Map<String, Long> counters = this.combineCounters(checkpoint.countersOnCommit, checkpoint.immediateCounters);
            flowFileEvent.setCounters(counters);
            this.context.getFlowFileEventRepository().updateRepository((FlowFileEvent)flowFileEvent, connectable.getIdentifier());
            for (Map.Entry<String, StandardFlowFileEvent> entry : checkpoint.connectionCounts.entrySet()) {
                this.context.getFlowFileEventRepository().updateRepository((FlowFileEvent)entry.getValue(), entry.getKey());
            }
        }
        catch (IOException ioe) {
            LOG.error("FlowFile Event Repository failed to update", (Throwable)ioe);
        }
    }

    private Map<String, Long> combineCounters(Map<String, Long> first, Map<String, Long> second) {
        boolean secondEmpty;
        boolean firstEmpty = first == null || first.isEmpty();
        boolean bl = secondEmpty = second == null || second.isEmpty();
        if (firstEmpty && secondEmpty) {
            return null;
        }
        if (firstEmpty) {
            return second;
        }
        if (secondEmpty) {
            return first;
        }
        HashMap<String, Long> combined = new HashMap<String, Long>();
        combined.putAll(first);
        second.forEach((key, value) -> combined.merge((String)key, (Long)value, Long::sum));
        return combined;
    }

    private void addEventType(Map<String, BitSet> map, String id, ProvenanceEventType eventType) {
        BitSet eventTypes = map.computeIfAbsent(id, key -> new BitSet());
        eventTypes.set(eventType.ordinal());
    }

    private StandardRepositoryRecord getRecord(FlowFile flowFile) {
        return this.records.get(flowFile.getId());
    }

    protected void updateProvenanceRepo(final Checkpoint checkpoint) {
        FlowFile flowFile;
        ProvenanceEventRepository provenanceRepo = this.context.getProvenanceRepository();
        final LinkedHashSet<ProvenanceEventRecord> recordsToSubmit = new LinkedHashSet<ProvenanceEventRecord>();
        HashMap<String, BitSet> eventTypesPerFlowFileId = new HashMap<String, BitSet>();
        Set<ProvenanceEventRecord> processorGenerated = checkpoint.reportedEvents;
        for (Map.Entry<FlowFile, ProvenanceEventBuilder> entry : checkpoint.forkEventBuilders.entrySet()) {
            ProvenanceEventBuilder builder = entry.getValue();
            flowFile = entry.getKey();
            this.updateEventContentClaims(builder, flowFile, checkpoint.getRecord(flowFile));
            ProvenanceEventRecord event = builder.build();
            if (event.getChildUuids().isEmpty() || this.isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) continue;
            if (!processorGenerated.contains(event)) {
                recordsToSubmit.add(event);
            }
            for (String childUuid : event.getChildUuids()) {
                this.addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType());
            }
            for (String parentUuid : event.getParentUuids()) {
                this.addEventType(eventTypesPerFlowFileId, parentUuid, event.getEventType());
            }
        }
        for (Map.Entry<FlowFile, Object> entry : checkpoint.generatedProvenanceEvents.entrySet()) {
            for (Object event2 : (List)entry.getValue()) {
                ProvenanceEventType eventType = event2.getEventType();
                if (eventType != ProvenanceEventType.JOIN) continue;
                recordsToSubmit.add((ProvenanceEventRecord)event2);
                this.addEventType(eventTypesPerFlowFileId, event2.getFlowFileUuid(), event2.getEventType());
            }
        }
        for (ProvenanceEventRecord provenanceEventRecord : processorGenerated) {
            Object event2;
            if (this.isSpuriousForkEvent(provenanceEventRecord, checkpoint.removedFlowFiles) || this.isSpuriousRouteEvent(provenanceEventRecord, checkpoint.records)) continue;
            recordsToSubmit.add(provenanceEventRecord);
            this.addEventType(eventTypesPerFlowFileId, provenanceEventRecord.getFlowFileUuid(), provenanceEventRecord.getEventType());
            List childUuids = provenanceEventRecord.getChildUuids();
            if (childUuids == null) continue;
            event2 = childUuids.iterator();
            while (event2.hasNext()) {
                String childUuid = (String)event2.next();
                this.addEventType(eventTypesPerFlowFileId, childUuid, provenanceEventRecord.getEventType());
            }
        }
        for (List list : checkpoint.generatedProvenanceEvents.values()) {
            for (ProvenanceEventRecord event : list) {
                if (event.getEventType() == ProvenanceEventType.JOIN || this.isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) continue;
                recordsToSubmit.add(event);
                this.addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
            }
        }
        for (StandardRepositoryRecord standardRepositoryRecord : checkpoint.records.values()) {
            boolean newFlowFile;
            ContentClaim current;
            ContentClaim original = standardRepositoryRecord.getOriginalClaim();
            boolean contentChanged = !Objects.equals(original, current = standardRepositoryRecord.getCurrentClaim());
            FlowFileRecord curFlowFile = standardRepositoryRecord.getCurrent();
            String flowFileId = curFlowFile.getAttribute(CoreAttributes.UUID.key());
            boolean eventAdded = false;
            if (checkpoint.removedFlowFiles.contains(flowFileId)) continue;
            boolean bl = newFlowFile = standardRepositoryRecord.getOriginal() == null;
            if (contentChanged && !newFlowFile) {
                recordsToSubmit.add(this.provenanceReporter.build((FlowFile)curFlowFile, ProvenanceEventType.CONTENT_MODIFIED).build());
                this.addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.CONTENT_MODIFIED);
                eventAdded = true;
            }
            if (checkpoint.createdFlowFiles.contains(flowFileId)) {
                BitSet registeredTypes = (BitSet)eventTypesPerFlowFileId.get(flowFileId);
                boolean creationEventRegistered = false;
                if (registeredTypes != null && (registeredTypes.get(ProvenanceEventType.CREATE.ordinal()) || registeredTypes.get(ProvenanceEventType.FORK.ordinal()) || registeredTypes.get(ProvenanceEventType.CLONE.ordinal()) || registeredTypes.get(ProvenanceEventType.JOIN.ordinal()) || registeredTypes.get(ProvenanceEventType.RECEIVE.ordinal()) || registeredTypes.get(ProvenanceEventType.FETCH.ordinal()))) {
                    creationEventRegistered = true;
                }
                if (!creationEventRegistered) {
                    recordsToSubmit.add(this.provenanceReporter.build((FlowFile)curFlowFile, ProvenanceEventType.CREATE).build());
                    eventAdded = true;
                }
            }
            if (eventAdded || standardRepositoryRecord.getUpdatedAttributes().isEmpty() || curFlowFile.getAttribute(this.retryAttribute) != null || eventTypesPerFlowFileId.containsKey(flowFileId)) continue;
            recordsToSubmit.add(this.provenanceReporter.build((FlowFile)curFlowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).build());
            this.addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.ATTRIBUTES_MODIFIED);
        }
        final HashMap<String, FlowFile> flowFileRecordMap = new HashMap<String, FlowFile>();
        for (StandardRepositoryRecord repoRecord : checkpoint.records.values()) {
            flowFile = repoRecord.getCurrent();
            flowFileRecordMap.put(flowFile.getAttribute(CoreAttributes.UUID.key()), flowFile);
        }
        final long l = System.nanoTime();
        final List<ProvenanceEventRecord> autoTermEvents = checkpoint.autoTerminatedEvents;
        Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>(){
            final Iterator<ProvenanceEventRecord> recordsToSubmitIterator;
            final Iterator<ProvenanceEventRecord> autoTermIterator;
            {
                this.recordsToSubmitIterator = recordsToSubmit.iterator();
                this.autoTermIterator = autoTermEvents == null ? null : autoTermEvents.iterator();
            }

            @Override
            public Iterator<ProvenanceEventRecord> iterator() {
                return new Iterator<ProvenanceEventRecord>(){

                    @Override
                    public boolean hasNext() {
                        return recordsToSubmitIterator.hasNext() || autoTermIterator != null && autoTermIterator.hasNext();
                    }

                    @Override
                    public ProvenanceEventRecord next() {
                        if (recordsToSubmitIterator.hasNext()) {
                            ProvenanceEventRecord rawEvent = recordsToSubmitIterator.next();
                            boolean isUpdateAttributesAndContent = rawEvent.getEventType() != ProvenanceEventType.SEND && rawEvent.getEventType() != ProvenanceEventType.UPLOAD && rawEvent.getEventType() != ProvenanceEventType.CLONE;
                            return StandardProcessSession.this.enrich(rawEvent, flowFileRecordMap, checkpoint.records, isUpdateAttributesAndContent, l);
                        }
                        if (autoTermIterator != null && autoTermIterator.hasNext()) {
                            return StandardProcessSession.this.enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true, l);
                        }
                        throw new NoSuchElementException();
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
        provenanceRepo.registerEvents((Iterable)iterable);
    }

    private void updateEventContentClaims(ProvenanceEventBuilder builder, FlowFile flowFile, StandardRepositoryRecord repoRecord) {
        ContentClaim originalClaim = repoRecord.getOriginalClaim();
        if (originalClaim == null) {
            builder.setCurrentContentClaim(null, null, null, null, 0L);
        } else {
            ResourceClaim resourceClaim = originalClaim.getResourceClaim();
            builder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(repoRecord.getOriginal().getContentClaimOffset() + originalClaim.getOffset()), repoRecord.getOriginal().getSize());
            builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(repoRecord.getOriginal().getContentClaimOffset() + originalClaim.getOffset()), repoRecord.getOriginal().getSize());
        }
    }

    @Override
    public ProvenanceEventRecord enrich(ProvenanceEventRecord rawEvent, FlowFile flowFile, long commitNanos) {
        FlowFileQueue originalQueue;
        ResourceClaim resourceClaim;
        this.verifyTaskActive();
        StandardRepositoryRecord repoRecord = this.getRecord(flowFile);
        if (repoRecord == null) {
            throw new FlowFileHandlingException(String.valueOf(flowFile) + " is not known in this session (" + this.toString() + ")");
        }
        ProvenanceEventBuilder recordBuilder = this.context.createProvenanceEventBuilder().fromEvent(rawEvent);
        if (repoRecord.getCurrent() != null && repoRecord.getCurrentClaim() != null) {
            ContentClaim currentClaim = repoRecord.getCurrentClaim();
            long currentOffset = repoRecord.getCurrentClaimOffset();
            long size = flowFile.getSize();
            resourceClaim = currentClaim.getResourceClaim();
            recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(currentOffset + currentClaim.getOffset()), size);
        }
        if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
            ContentClaim originalClaim = repoRecord.getOriginalClaim();
            long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
            long originalSize = repoRecord.getOriginal().getSize();
            resourceClaim = originalClaim.getResourceClaim();
            recordBuilder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(originalOffset + originalClaim.getOffset()), originalSize);
        }
        if ((originalQueue = repoRecord.getOriginalQueue()) != null) {
            recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier());
        }
        recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes());
        if (rawEvent.getEventDuration() < 0L) {
            recordBuilder.setEventDuration(TimeUnit.NANOSECONDS.toMillis(commitNanos - repoRecord.getStartNanos()));
        }
        return recordBuilder.build();
    }

    private ProvenanceEventRecord enrich(ProvenanceEventRecord rawEvent, Map<String, FlowFileRecord> flowFileRecordMap, Map<Long, StandardRepositoryRecord> records, boolean updateAttributesAndContent, long commitNanos) {
        ProvenanceEventBuilder recordBuilder = this.context.createProvenanceEventBuilder().fromEvent(rawEvent);
        FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
        if (eventFlowFile != null) {
            FlowFileQueue originalQueue;
            ResourceClaim resourceClaim;
            StandardRepositoryRecord repoRecord = records.get(eventFlowFile.getId());
            if (updateAttributesAndContent && repoRecord.getCurrent() != null && repoRecord.getCurrentClaim() != null) {
                ContentClaim currentClaim = repoRecord.getCurrentClaim();
                long currentOffset = repoRecord.getCurrentClaimOffset();
                long size = eventFlowFile.getSize();
                resourceClaim = currentClaim.getResourceClaim();
                recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(currentOffset + currentClaim.getOffset()), size);
            }
            if (updateAttributesAndContent && repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
                ContentClaim originalClaim = repoRecord.getOriginalClaim();
                long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
                long originalSize = repoRecord.getOriginal().getSize();
                resourceClaim = originalClaim.getResourceClaim();
                recordBuilder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(originalOffset + originalClaim.getOffset()), originalSize);
            }
            if ((originalQueue = repoRecord.getOriginalQueue()) != null) {
                recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier());
            }
            if (updateAttributesAndContent) {
                recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes());
            }
            if (rawEvent.getEventDuration() < 0L) {
                recordBuilder.setEventDuration(TimeUnit.NANOSECONDS.toMillis(commitNanos - repoRecord.getStartNanos()));
            }
        }
        return recordBuilder.build();
    }

    private boolean isSpuriousForkEvent(ProvenanceEventRecord event, Set<String> removedFlowFiles) {
        List childUuids;
        return event.getEventType() == ProvenanceEventType.FORK && (childUuids = event.getChildUuids()) != null && childUuids.size() == 1 && removedFlowFiles.contains(childUuids.get(0));
    }

    private boolean isSpuriousRouteEvent(ProvenanceEventRecord event, Map<Long, StandardRepositoryRecord> records) {
        String relationshipName;
        Relationship relationship;
        Collection<Connection> connectionsForRelationship;
        if (event.getEventType() == ProvenanceEventType.ROUTE && (connectionsForRelationship = this.context.getConnections(relationship = new Relationship.Builder().name(relationshipName = event.getRelationship()).build())).size() == 1) {
            for (StandardRepositoryRecord repoRecord : records.values()) {
                FlowFileRecord flowFileRecord = repoRecord.getCurrent();
                if (!event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key()))) continue;
                if (repoRecord.getOriginalQueue() == null) {
                    return false;
                }
                String originalQueueId = repoRecord.getOriginalQueue().getIdentifier();
                Connection destinationConnection = connectionsForRelationship.iterator().next();
                String destinationQueueId = destinationConnection.getFlowFileQueue().getIdentifier();
                return originalQueueId.equals(destinationQueueId);
            }
        }
        return false;
    }

    public void rollback() {
        this.rollback(false);
    }

    public void rollback(boolean penalize) {
        this.rollback(penalize, false);
    }

    protected synchronized void rollback(boolean penalize, boolean rollbackCheckpoint) {
        List<ContentClaim> transientClaims;
        HashSet<StandardRepositoryRecord> recordsToHandle;
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} session rollback called, FlowFile records are {}", new Object[]{this, this.loggableFlowfileInfo(), new Throwable("Stack Trace on rollback")});
        }
        this.deleteOnCommit.clear();
        this.closeStreams(this.openInputStreams, "rolled back", "input");
        this.closeStreams(this.openOutputStreams, "rolled back", "output");
        try {
            this.claimCache.reset();
        }
        catch (IOException e1) {
            LOG.warn("{} Attempted to close Output Stream for {} due to session rollback but close failed", new Object[]{this, this.connectableDescription, e1});
        }
        if (this.localState != null || this.clusterState != null) {
            LOG.debug("Rolling back session that has state stored. This state will not be updated.");
        }
        if (rollbackCheckpoint && this.checkpoint != null && (this.checkpoint.localState != null || this.checkpoint.clusterState != null)) {
            LOG.debug("Rolling back checkpoint that has state stored. This state will not be updated.");
        }
        HashSet<StandardRepositoryRecord> recordValues = this.records.values();
        HashSet<StandardRepositoryRecord> hashSet = recordsToHandle = rollbackCheckpoint ? new HashSet<StandardRepositoryRecord>(recordValues) : recordValues;
        if (rollbackCheckpoint) {
            Checkpoint existingCheckpoint = this.checkpoint;
            this.checkpoint = null;
            if (existingCheckpoint != null && existingCheckpoint.records != null) {
                recordsToHandle.addAll(existingCheckpoint.records.values());
            }
        }
        this.resetWriteClaims();
        this.resetReadClaim();
        if (recordsToHandle.isEmpty()) {
            LOG.trace("{} was rolled back, but no events were performed by this ProcessSession", (Object)this);
            this.acknowledgeRecords();
            this.resetState();
            return;
        }
        for (StandardRepositoryRecord record2 : recordsToHandle) {
            this.removeTemporaryClaim(record2);
        }
        HashSet<StandardRepositoryRecord> abortedRecords = new HashSet<StandardRepositoryRecord>();
        HashSet<StandardRepositoryRecord> transferRecords = new HashSet<StandardRepositoryRecord>();
        for (StandardRepositoryRecord record3 : recordsToHandle) {
            if (record3.isMarkedForAbort()) {
                this.decrementClaimCount(record3.getWorkingClaim());
                abortedRecords.add(record3);
                continue;
            }
            transferRecords.add(record3);
        }
        for (StandardRepositoryRecord record3 : transferRecords) {
            this.rollbackRecord(record3, penalize);
        }
        if (!abortedRecords.isEmpty()) {
            try {
                this.context.getFlowFileRepository().updateRepository(abortedRecords);
            }
            catch (IOException ioe) {
                LOG.error("Unable to update FlowFile repository for aborted records", (Throwable)ioe);
            }
        }
        if (!(transientClaims = recordsToHandle.stream().flatMap(record -> record.getTransientClaims().stream()).collect(Collectors.toList())).isEmpty()) {
            TransientClaimRepositoryRecord repoRecord = new TransientClaimRepositoryRecord(transientClaims);
            try {
                this.context.getFlowFileRepository().updateRepository(Collections.singletonList(repoRecord));
            }
            catch (IOException ioe) {
                LOG.error("Unable to update FlowFile repository to cleanup transient claims", (Throwable)ioe);
            }
        }
        Connectable connectable = this.context.getConnectable();
        StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent();
        flowFileEvent.setBytesRead(this.bytesRead);
        flowFileEvent.setBytesWritten(this.bytesWritten);
        flowFileEvent.setCounters(this.immediateCounters);
        try {
            this.context.getFlowFileEventRepository().updateRepository((FlowFileEvent)flowFileEvent, connectable.getIdentifier());
        }
        catch (Exception e) {
            LOG.error("Failed to update FlowFileEvent Repository", (Throwable)e);
        }
        this.acknowledgeRecords();
        this.resetState();
    }

    protected void rollbackRecord(StandardRepositoryRecord record, boolean penalize) {
        FlowFileQueue originalQueue;
        if (record.getOriginal() != null && (originalQueue = record.getOriginalQueue()) != null) {
            if (penalize) {
                long expirationEpochMillis = System.currentTimeMillis() + this.context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);
                FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getOriginal()).penaltyExpirationTime(expirationEpochMillis).build();
                originalQueue.put(newFile);
            } else {
                originalQueue.put(record.getOriginal());
            }
        }
    }

    private String loggableFlowfileInfo() {
        StringBuilder details = new StringBuilder(1024).append("[");
        int initLen = details.length();
        int filesListed = 0;
        for (StandardRepositoryRecord repoRecord : this.records.values()) {
            if (filesListed >= 5) break;
            ++filesListed;
            if (details.length() > initLen) {
                details.append(", ");
            }
            if (repoRecord.getOriginalQueue() != null && repoRecord.getOriginalQueue().getIdentifier() != null) {
                details.append("queue=").append(repoRecord.getOriginalQueue().getIdentifier()).append(", ");
            }
            details.append("filename=").append(repoRecord.getCurrent().getAttribute(CoreAttributes.FILENAME.key())).append(", uuid=").append(repoRecord.getCurrent().getAttribute(CoreAttributes.UUID.key()));
        }
        if (this.records.size() > 5) {
            if (details.length() > initLen) {
                details.append(", ");
            }
            details.append(this.records.size() - 5).append(" additional FlowFiles not listed");
        } else if (filesListed == 0) {
            details.append("none");
        }
        details.append("]");
        return details.toString();
    }

    private void decrementClaimCount(ContentClaim claim) {
        if (claim == null) {
            return;
        }
        this.context.getContentRepository().decrementClaimantCount(claim);
    }

    private void destroyContent(ContentClaim claim, StandardRepositoryRecord repoRecord) {
        if (claim == null) {
            return;
        }
        int decrementedClaimCount = this.context.getContentRepository().decrementClaimantCount(claim);
        boolean removed = false;
        if (decrementedClaimCount <= 0) {
            this.resetWriteClaims();
            removed = this.context.getContentRepository().remove(claim);
        }
        if (!removed) {
            repoRecord.addTransientClaim(claim);
        }
    }

    private void resetState() {
        this.records.clear();
        this.readRecursionSet.clear();
        this.writeRecursionSet.clear();
        this.contentSizeIn = 0L;
        this.contentSizeOut = 0L;
        this.flowFilesIn = 0;
        this.flowFilesOut = 0;
        this.removedCount = 0;
        this.removedBytes = 0L;
        this.bytesRead = 0L;
        this.bytesWritten = 0L;
        this.connectionCounts.clear();
        this.createdFlowFiles.clear();
        this.createdFlowFilesWithoutLineage.clear();
        this.removedFlowFiles.clear();
        this.flowFileLinkage.clear();
        if (this.countersOnCommit != null) {
            this.countersOnCommit.clear();
        }
        if (this.immediateCounters != null) {
            this.immediateCounters.clear();
        }
        this.generatedProvenanceEvents.clear();
        this.forkEventBuilders.clear();
        this.provenanceReporter.clear();
        this.localState = null;
        this.clusterState = null;
        this.processingStartTime = System.nanoTime();
    }

    private void acknowledgeRecords() {
        Iterator<Map.Entry<FlowFileQueue, Set<FlowFileRecord>>> itr = this.unacknowledgedFlowFiles.entrySet().iterator();
        while (itr.hasNext()) {
            Map.Entry<FlowFileQueue, Set<FlowFileRecord>> entry = itr.next();
            itr.remove();
            entry.getKey().acknowledge((Collection)entry.getValue());
        }
    }

    public void migrate(ProcessSession newOwner) {
        ArrayList<FlowFile> allFlowFiles = new ArrayList<FlowFile>();
        for (StandardRepositoryRecord repositoryRecord : this.records.values()) {
            allFlowFiles.add((FlowFile)repositoryRecord.getCurrent());
        }
        this.migrate(newOwner, allFlowFiles);
    }

    public void migrate(ProcessSession newOwner, Collection<FlowFile> flowFiles) {
        if (Objects.requireNonNull(newOwner) == this) {
            throw new IllegalArgumentException("Cannot migrate FlowFiles from a Process Session to itself");
        }
        if (flowFiles == null || flowFiles.isEmpty()) {
            throw new IllegalArgumentException("Must supply at least one FlowFile to migrate");
        }
        if (!(newOwner instanceof StandardProcessSession)) {
            throw new IllegalArgumentException("Cannot migrate from a StandardProcessSession to a " + String.valueOf(newOwner.getClass()));
        }
        this.migrate((StandardProcessSession)newOwner, flowFiles);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void migrate(StandardProcessSession newOwner, Collection<FlowFile> flowFiles) {
        StandardProcessSession standardProcessSession = newOwner;
        synchronized (standardProcessSession) {
            this.verifyTaskActive();
            newOwner.verifyTaskActive();
            flowFiles = flowFiles.stream().map(this::getMostRecent).collect(Collectors.toList());
            for (Object flowFile : flowFiles) {
                if (this.openInputStreams.containsKey(flowFile)) {
                    throw new IllegalStateException(String.valueOf(flowFile) + " cannot be migrated to a new Process Session because this session currently has an open InputStream for the FlowFile, created by calling ProcessSession.read(FlowFile)");
                }
                if (this.openOutputStreams.containsKey(flowFile)) {
                    throw new IllegalStateException(String.valueOf(flowFile) + " cannot be migrated to a new Process Session because this session currently has an open OutputStream for the FlowFile, created by calling ProcessSession.write(FlowFile)");
                }
                if (this.readRecursionSet.containsKey(flowFile)) {
                    throw new IllegalStateException(String.valueOf(flowFile) + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
                }
                if (this.writeRecursionSet.contains(flowFile)) {
                    throw new IllegalStateException(String.valueOf(flowFile) + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed");
                }
                StandardRepositoryRecord standardRepositoryRecord = this.getRecord((FlowFile)flowFile);
                if (standardRepositoryRecord != null) continue;
                throw new FlowFileHandlingException(String.valueOf(flowFile) + " is not known in this session (" + this.toString() + ")");
            }
            Set flowFileIds = flowFiles.stream().map(ff -> ff.getAttribute(CoreAttributes.UUID.key())).collect(Collectors.toSet());
            for (Map.Entry entry : this.forkEventBuilders.entrySet()) {
                ProvenanceEventBuilder eventBuilder;
                FlowFile eventFlowFile = (FlowFile)entry.getKey();
                if (flowFiles.contains(eventFlowFile)) {
                    eventBuilder = (ProvenanceEventBuilder)entry.getValue();
                    for (String childId : eventBuilder.getChildFlowFileIds()) {
                        if (flowFileIds.contains(childId)) continue;
                        throw new FlowFileHandlingException("Cannot migrate " + String.valueOf(eventFlowFile) + " to a new session because it was forked to create " + eventBuilder.getChildFlowFileIds().size() + " children and not all children are being migrated. If any FlowFile is forked, all of its children must also be migrated at the same time as the forked FlowFile");
                    }
                    continue;
                }
                eventBuilder = (ProvenanceEventBuilder)entry.getValue();
                for (String childId : eventBuilder.getChildFlowFileIds()) {
                    if (!flowFileIds.contains(childId)) continue;
                    throw new FlowFileHandlingException("Cannot migrate " + String.valueOf(eventFlowFile) + " to a new session because it was forked from a Parent FlowFile, but the parent is not being migrated. If any FlowFile is forked, the parent and all children must be migrated at the same time.");
                }
            }
            HashSet<FlowFile> forkedFlowFilesMigrated = new HashSet<FlowFile>();
            for (Map.Entry<FlowFile, ProvenanceEventBuilder> entry : this.forkEventBuilders.entrySet()) {
                FlowFile eventFlowFile = entry.getKey();
                ProvenanceEventBuilder eventBuilder = entry.getValue();
                if (!flowFiles.contains(eventFlowFile)) continue;
                HashSet childrenIds = new HashSet(eventBuilder.getChildFlowFileIds());
                ProvenanceEventBuilder copy = null;
                for (FlowFile flowFile : flowFiles) {
                    String flowFileId = flowFile.getAttribute(CoreAttributes.UUID.key());
                    if (!childrenIds.contains(flowFileId)) continue;
                    eventBuilder.removeChildFlowFile(flowFile);
                    if (copy == null) {
                        copy = eventBuilder.copy();
                        copy.getChildFlowFileIds().clear();
                    }
                    copy.addChildFlowFile(flowFileId);
                }
                if (copy == null) continue;
                newOwner.forkEventBuilders.put(eventFlowFile, copy);
                forkedFlowFilesMigrated.add(eventFlowFile);
            }
            forkedFlowFilesMigrated.forEach(this.forkEventBuilders::remove);
            newOwner.processingStartTime = Math.min(newOwner.processingStartTime, this.processingStartTime);
            for (FlowFile flowFile : flowFiles) {
                Path toDelete;
                ByteCountingOutputStream appendableStream;
                ContentClaim currentClaim;
                List<ProvenanceEventRecord> events;
                String flowFileUuid;
                FlowFileRecord flowFileRecord = (FlowFileRecord)flowFile;
                long flowFileId = flowFile.getId();
                StandardRepositoryRecord repoRecord = this.records.remove(flowFileId);
                newOwner.records.put(flowFileId, repoRecord);
                Collection<Long> linkedIds = this.flowFileLinkage.getLinkedIds(flowFileId);
                linkedIds.forEach(linkedId -> newOwner.flowFileLinkage.addLink(flowFileId, (long)linkedId));
                FlowFileQueue inputQueue = repoRecord.getOriginalQueue();
                if (inputQueue != null) {
                    String connectionId = inputQueue.getIdentifier();
                    this.incrementConnectionOutputCounts(connectionId, -1, -repoRecord.getOriginal().getSize());
                    newOwner.incrementConnectionOutputCounts(connectionId, 1, repoRecord.getOriginal().getSize());
                    this.unacknowledgedFlowFiles.get(inputQueue).remove(flowFile);
                    newOwner.unacknowledgedFlowFiles.computeIfAbsent(inputQueue, queue -> new HashSet()).add(flowFileRecord);
                    --this.flowFilesIn;
                    this.contentSizeIn -= flowFile.getSize();
                    ++newOwner.flowFilesIn;
                    newOwner.contentSizeIn += flowFile.getSize();
                }
                if (this.removedFlowFiles.remove(flowFileUuid = flowFile.getAttribute(CoreAttributes.UUID.key()))) {
                    newOwner.removedFlowFiles.add(flowFileUuid);
                    ++newOwner.removedCount;
                    newOwner.removedBytes += flowFile.getSize();
                    --this.removedCount;
                    this.removedBytes -= flowFile.getSize();
                }
                if (this.createdFlowFiles.remove(flowFileUuid)) {
                    newOwner.createdFlowFiles.add(flowFileUuid);
                }
                if (repoRecord.getTransferRelationship() != null) {
                    boolean autoTerminated;
                    Relationship transferRelationship = repoRecord.getTransferRelationship();
                    Collection<Connection> destinations = this.context.getConnections(transferRelationship);
                    int numDestinations = destinations.size();
                    boolean bl = autoTerminated = numDestinations == 0 && this.context.getConnectable().isAutoTerminated(transferRelationship);
                    if (autoTerminated) {
                        --this.removedCount;
                        this.removedBytes -= flowFile.getSize();
                        ++newOwner.removedCount;
                        newOwner.removedBytes += flowFile.getSize();
                    } else {
                        --this.flowFilesOut;
                        this.contentSizeOut -= flowFile.getSize();
                        ++newOwner.flowFilesOut;
                        newOwner.contentSizeOut += flowFile.getSize();
                    }
                }
                if ((events = this.generatedProvenanceEvents.remove(flowFile)) != null) {
                    newOwner.generatedProvenanceEvents.put(flowFile, events);
                }
                if ((currentClaim = repoRecord.getCurrentClaim()) != null && (appendableStream = this.appendableStreams.remove(currentClaim)) != null) {
                    newOwner.appendableStreams.put(currentClaim, appendableStream);
                }
                if ((toDelete = this.deleteOnCommit.remove(flowFile)) == null) continue;
                newOwner.deleteOnCommit.put(flowFile, toDelete);
            }
            this.provenanceReporter.migrate(newOwner.provenanceReporter, flowFileIds);
        }
    }

    /*
     * WARNING - void declaration
     */
    private String summarizeEvents(Checkpoint checkpoint) {
        HashMap transferMap = new HashMap();
        HashSet<String> modifiedFlowFileIds = new HashSet<String>();
        int largestTransferSetSize = 0;
        for (Map.Entry<Long, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) {
            void var10_14;
            StandardRepositoryRecord record = entry.getValue();
            FlowFileRecord flowFile = record.getCurrent();
            Relationship relationship = record.getTransferRelationship();
            if (Relationship.SELF.equals((Object)relationship)) continue;
            Set set = (Set)transferMap.get(relationship);
            if (set == null) {
                HashSet hashSet = new HashSet();
                transferMap.put(relationship, hashSet);
            }
            var10_14.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
            largestTransferSetSize = Math.max(largestTransferSetSize, var10_14.size());
            ContentClaim workingClaim = record.getWorkingClaim();
            if (workingClaim == null || workingClaim == record.getOriginalClaim() || record.getTransferRelationship() == null) continue;
            modifiedFlowFileIds.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
        }
        int numRemoved = checkpoint.removedFlowFiles.size();
        int numModified = modifiedFlowFileIds.size();
        int numCreated = checkpoint.createdFlowFiles.size();
        StringBuilder sb = new StringBuilder(512);
        if (!(LOG.isDebugEnabled() || largestTransferSetSize <= 10 && numModified <= 10 && numCreated <= 10 && numRemoved <= 10)) {
            if (numCreated > 0) {
                sb.append("created ").append(numCreated).append(" FlowFiles, ");
            }
            if (numModified > 0) {
                sb.append("modified ").append(modifiedFlowFileIds.size()).append(" FlowFiles, ");
            }
            if (numRemoved > 0) {
                sb.append("removed ").append(numRemoved).append(" FlowFiles, ");
            }
            for (Map.Entry entry : transferMap.entrySet()) {
                if (entry.getKey() == null) continue;
                sb.append("Transferred ").append(((Set)entry.getValue()).size()).append(" FlowFiles");
                relationship = (Relationship)entry.getKey();
                if (relationship == Relationship.ANONYMOUS) continue;
                sb.append(" to '").append(relationship.getName()).append("', ");
            }
        } else {
            if (numCreated > 0) {
                sb.append("created FlowFiles ").append(checkpoint.createdFlowFiles).append(", ");
            }
            if (numModified > 0) {
                sb.append("modified FlowFiles ").append(modifiedFlowFileIds).append(", ");
            }
            if (numRemoved > 0) {
                sb.append("removed FlowFiles ").append(checkpoint.removedFlowFiles).append(", ");
            }
            for (Map.Entry entry : transferMap.entrySet()) {
                if (entry.getKey() == null) continue;
                sb.append("Transferred FlowFiles ").append(entry.getValue());
                relationship = (Relationship)entry.getKey();
                if (relationship == Relationship.ANONYMOUS) continue;
                sb.append(" to '").append(relationship.getName()).append("', ");
            }
        }
        if (sb.length() > 2 && sb.subSequence(sb.length() - 2, sb.length()).equals(", ")) {
            sb.delete(sb.length() - 2, sb.length());
        }
        if (sb.length() > 0) {
            long processingNanos = checkpoint.processingTime;
            sb.append(", Processing Time = ");
            this.formatNanos(processingNanos, sb);
        }
        return sb.toString();
    }

    private void formatNanos(long nanos, StringBuilder sb) {
        long seconds = nanos > 1000000000L ? nanos / 1000000000L : 0L;
        long millis = nanos > 1000000L ? nanos / 1000000L : 0L;
        long nanosLeft = nanos % 1000000L;
        if (seconds > 0L) {
            sb.append(seconds).append(" seconds");
        }
        if (millis > 0L) {
            if (seconds > 0L) {
                sb.append(", ");
                millis -= seconds * 1000L;
            }
            sb.append(millis).append(" millis");
        }
        if (seconds == 0L && millis == 0L) {
            sb.append(nanosLeft).append(" nanos");
        }
        sb.append(" (").append(nanos).append(" nanos)");
    }

    private void incrementConnectionInputCounts(Connection connection, RepositoryRecord record) {
        this.incrementConnectionInputCounts(connection.getIdentifier(), 1, record.getCurrent().getSize());
    }

    private void incrementConnectionInputCounts(String connectionId, int flowFileCount, long bytes) {
        StandardFlowFileEvent connectionEvent = this.connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent());
        connectionEvent.setContentSizeIn(connectionEvent.getContentSizeIn() + bytes);
        connectionEvent.setFlowFilesIn(connectionEvent.getFlowFilesIn() + flowFileCount);
    }

    private void incrementConnectionOutputCounts(Connection connection, FlowFileRecord record) {
        this.incrementConnectionOutputCounts(connection.getIdentifier(), 1, record.getSize());
    }

    private void incrementConnectionOutputCounts(String connectionId, int flowFileCount, long bytes) {
        StandardFlowFileEvent connectionEvent = this.connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent());
        connectionEvent.setContentSizeOut(connectionEvent.getContentSizeOut() + bytes);
        connectionEvent.setFlowFilesOut(connectionEvent.getFlowFilesOut() + flowFileCount);
    }

    private void registerDequeuedRecord(FlowFileRecord flowFile, Connection connection) {
        StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile);
        if (this.checkpoint != null) {
            StandardRepositoryRecord checkpointedRecord = this.checkpoint.getRecord((FlowFile)flowFile);
            this.handleConflictingId(flowFile, connection, checkpointedRecord);
        }
        StandardRepositoryRecord existingRecord = this.records.putIfAbsent(flowFile.getId(), record);
        this.handleConflictingId(flowFile, connection, existingRecord);
        ++this.flowFilesIn;
        this.contentSizeIn += flowFile.getSize();
        Set set = this.unacknowledgedFlowFiles.computeIfAbsent(connection.getFlowFileQueue(), k -> new HashSet());
        set.add(flowFile);
        this.incrementConnectionOutputCounts(connection, flowFile);
    }

    private void handleConflictingId(FlowFileRecord flowFile, Connection connection, StandardRepositoryRecord conflict) {
        if (conflict == null) {
            return;
        }
        LOG.error("Attempted to pull {} from {} but the Session already has a FlowFile with the same ID ({}): {}, which was pulled from {}. This means that the system has two FlowFiles with the same ID, which should not happen.", new Object[]{flowFile, connection, flowFile.getId(), conflict.getCurrent(), conflict.getOriginalQueue()});
        connection.getFlowFileQueue().put(flowFile);
        this.rollback(true, false);
        throw new FlowFileAccessException("Attempted to pull a FlowFile with ID " + flowFile.getId() + " from Connection " + String.valueOf(connection) + " but a FlowFile with that ID already exists in the session");
    }

    public void adjustCounter(String name, long delta, boolean immediate) {
        Map<String, Long> counters;
        if (!immediate) {
            this.verifyTaskActive();
        }
        if (immediate) {
            if (this.immediateCounters == null) {
                this.immediateCounters = new HashMap<String, Long>();
            }
            counters = this.immediateCounters;
        } else {
            if (this.countersOnCommit == null) {
                this.countersOnCommit = new HashMap<String, Long>();
            }
            counters = this.countersOnCommit;
        }
        this.adjustCounter(name, delta, counters);
        if (immediate) {
            this.context.adjustCounter(name, delta);
        }
    }

    private void adjustCounter(String name, long delta, Map<String, Long> map) {
        Long curVal = map.get(name);
        if (curVal == null) {
            curVal = 0L;
        }
        long newValue = curVal + delta;
        map.put(name, newValue);
    }

    public FlowFile get() {
        this.verifyTaskActive();
        List<Connection> connections = this.context.getPollableConnections();
        int numConnections = connections.size();
        for (int numAttempts = 0; numAttempts < numConnections; ++numAttempts) {
            Connection conn = connections.get(this.context.getNextIncomingConnectionIndex() % numConnections);
            HashSet<FlowFileRecord> expired = new HashSet<FlowFileRecord>();
            FlowFileRecord flowFile = conn.poll(expired);
            this.removeExpired(expired, conn);
            if (flowFile == null) continue;
            this.registerDequeuedRecord(flowFile, conn);
            return flowFile;
        }
        return null;
    }

    public List<FlowFile> get(final int maxResults) {
        this.verifyTaskActive();
        if (maxResults < 0) {
            throw new IllegalArgumentException();
        }
        if (maxResults == 0) {
            return Collections.emptyList();
        }
        List<Connection> connections = this.context.getPollableConnections();
        if (connections.isEmpty()) {
            return Collections.emptyList();
        }
        return this.get((connection, expiredRecords) -> connection.poll(new FlowFileFilter(){
            int polled = 0;

            public FlowFileFilter.FlowFileFilterResult filter(FlowFile flowFile) {
                if (++this.polled < maxResults) {
                    return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
                }
                return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
            }
        }, expiredRecords), false);
    }

    public List<FlowFile> get(FlowFileFilter filter) {
        this.verifyTaskActive();
        return this.get((connection, expiredRecords) -> connection.poll(filter, expiredRecords), true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<FlowFile> get(ConnectionPoller poller, boolean lockAllQueues) {
        boolean sortConnections;
        List<Connection> connections = this.context.getPollableConnections();
        boolean bl = sortConnections = lockAllQueues && connections.size() > 1;
        if (sortConnections) {
            connections = new ArrayList<Connection>(connections);
            connections.sort(Comparator.comparing(Connection::getIdentifier));
            for (Connection connection : connections) {
                connection.lock();
            }
        }
        int startIndex = this.context.getNextIncomingConnectionIndex();
        try {
            for (int i = 0; i < connections.size(); ++i) {
                int connectionIndex = (startIndex + i) % connections.size();
                Connection conn = connections.get(connectionIndex);
                HashSet<FlowFileRecord> expired = new HashSet<FlowFileRecord>();
                List<FlowFileRecord> newlySelected = poller.poll(conn, expired);
                this.removeExpired(expired, conn);
                if (newlySelected.isEmpty() && expired.isEmpty()) continue;
                for (FlowFileRecord flowFile : newlySelected) {
                    this.registerDequeuedRecord(flowFile, conn);
                }
                ArrayList<FlowFileRecord> arrayList = new ArrayList<FlowFileRecord>(newlySelected);
                return arrayList;
            }
            ArrayList<FlowFile> arrayList = new ArrayList<FlowFile>();
            return arrayList;
        }
        finally {
            if (sortConnections) {
                connections.sort(Comparator.comparing(Connection::getIdentifier).reversed());
                for (Connection connection : connections) {
                    connection.unlock();
                }
            }
        }
    }

    public QueueSize getQueueSize() {
        this.verifyTaskActive();
        int flowFileCount = 0;
        long byteCount = 0L;
        for (Connection conn : this.context.getPollableConnections()) {
            QueueSize queueSize = conn.getFlowFileQueue().size();
            flowFileCount += queueSize.getObjectCount();
            byteCount += queueSize.getByteCount();
        }
        return new QueueSize(flowFileCount, byteCount);
    }

    public FlowFile create() {
        this.verifyTaskActive();
        HashMap<String, String> attrs = new HashMap<String, String>();
        String uuid = UUID.randomUUID().toString();
        attrs.put(CoreAttributes.FILENAME.key(), uuid);
        attrs.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
        attrs.put(CoreAttributes.UUID.key(), uuid);
        FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(this.context.getNextFlowFileSequence()).addAttributes(attrs).build();
        StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue)null);
        record.setWorking(fFile, attrs, false);
        this.records.put(fFile.getId(), record);
        this.createdFlowFiles.add(uuid);
        this.createdFlowFilesWithoutLineage.add(uuid);
        return fFile;
    }

    public FlowFile create(FlowFile parent) {
        this.verifyTaskActive();
        parent = this.getMostRecent(parent);
        String uuid = UUID.randomUUID().toString();
        HashMap<String, String> newAttributes = new HashMap<String, String>(3);
        newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
        newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
        newAttributes.put(CoreAttributes.UUID.key(), uuid);
        StandardFlowFileRecord.Builder fFileBuilder = new StandardFlowFileRecord.Builder().id(this.context.getNextFlowFileSequence());
        for (Map.Entry entry : parent.getAttributes().entrySet()) {
            String key = (String)entry.getKey();
            String value = (String)entry.getValue();
            if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key) || CoreAttributes.DISCARD_REASON.key().equals(key) || CoreAttributes.UUID.key().equals(key)) continue;
            newAttributes.put(key, value);
        }
        fFileBuilder.lineageStart(parent.getLineageStartDate(), parent.getLineageStartIndex());
        fFileBuilder.addAttributes(newAttributes);
        FlowFileRecord fFile = fFileBuilder.build();
        StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue)null);
        record.setWorking(fFile, newAttributes, false);
        this.records.put(fFile.getId(), record);
        this.createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
        this.registerForkEvent(parent, (FlowFile)fFile);
        this.flowFileLinkage.addLink(parent.getId(), fFile.getId());
        return fFile;
    }

    public FlowFile create(Collection<FlowFile> parents) {
        this.verifyTaskActive();
        parents = parents.stream().map(this::getMostRecent).collect(Collectors.toList());
        Map<String, String> newAttributes = StandardProcessSession.intersectAttributes(parents);
        newAttributes.remove(CoreAttributes.UUID.key());
        newAttributes.remove(CoreAttributes.ALTERNATE_IDENTIFIER.key());
        newAttributes.remove(CoreAttributes.DISCARD_REASON.key());
        long lineageStartDate = 0L;
        for (FlowFile parent : parents) {
            long parentLineageStartDate = parent.getLineageStartDate();
            if (lineageStartDate != 0L && parentLineageStartDate >= lineageStartDate) continue;
            lineageStartDate = parentLineageStartDate;
        }
        long lineageStartIndex = 0L;
        for (FlowFile parent : parents) {
            if (parent.getLineageStartDate() != lineageStartDate || parent.getLineageStartIndex() >= lineageStartIndex) continue;
            lineageStartIndex = parent.getLineageStartIndex();
        }
        String uuid = UUID.randomUUID().toString();
        newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
        newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
        newAttributes.put(CoreAttributes.UUID.key(), uuid);
        FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(this.context.getNextFlowFileSequence()).addAttributes(newAttributes).lineageStart(lineageStartDate, lineageStartIndex).build();
        StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue)null);
        record.setWorking(fFile, newAttributes, false);
        this.records.put(fFile.getId(), record);
        this.createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
        this.registerJoinEvent((FlowFile)fFile, parents);
        long flowFileId = fFile.getId();
        for (FlowFile parent : parents) {
            this.flowFileLinkage.addLink(flowFileId, parent.getId());
        }
        return fFile;
    }

    public FlowFile clone(FlowFile example) {
        this.verifyTaskActive();
        example = this.validateRecordState(example);
        return this.clone(example, 0L, example.getSize());
    }

    public FlowFile clone(FlowFile example, long offset, long size) {
        this.verifyTaskActive();
        example = this.validateRecordState(example);
        StandardRepositoryRecord exampleRepoRecord = this.getRecord(example);
        FlowFileRecord currRec = exampleRepoRecord.getCurrent();
        ContentClaim claim = exampleRepoRecord.getCurrentClaim();
        if (offset + size > example.getSize()) {
            throw new FlowFileHandlingException("Specified offset of " + offset + " and size " + size + " exceeds size of " + String.valueOf(example));
        }
        StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
        builder.id(this.context.getNextFlowFileSequence());
        builder.contentClaimOffset(currRec.getContentClaimOffset() + offset);
        builder.size(size);
        String newUuid = UUID.randomUUID().toString();
        builder.addAttribute(CoreAttributes.UUID.key(), newUuid);
        FlowFileRecord clone = builder.build();
        if (claim != null) {
            this.context.getContentRepository().incrementClaimaintCount(claim);
        }
        StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue)null);
        record.setWorking(clone, clone.getAttributes(), false);
        this.records.put(clone.getId(), record);
        if (offset == 0L && size == example.getSize()) {
            this.provenanceReporter.clone(example, (FlowFile)clone);
        } else {
            this.registerForkEvent(example, (FlowFile)clone);
        }
        this.flowFileLinkage.addLink(example.getId(), clone.getId());
        return clone;
    }

    private void registerForkEvent(FlowFile parent, FlowFile child) {
        ProvenanceEventBuilder eventBuilder = this.forkEventBuilders.get(parent);
        if (eventBuilder == null) {
            eventBuilder = this.context.getProvenanceRepository().eventBuilder();
            eventBuilder.setEventType(ProvenanceEventType.FORK);
            eventBuilder.setFlowFileEntryDate(parent.getEntryDate());
            eventBuilder.setLineageStartDate(parent.getLineageStartDate());
            eventBuilder.setFlowFileUUID(parent.getAttribute(CoreAttributes.UUID.key()));
            eventBuilder.setComponentId(this.context.getConnectable().getIdentifier());
            Connectable connectable = this.context.getConnectable();
            String processorType = connectable.getComponentType();
            eventBuilder.setComponentType(processorType);
            eventBuilder.addParentFlowFile(parent);
            this.updateEventContentClaims(eventBuilder, parent, this.getRecord(parent));
            this.forkEventBuilders.put(parent, eventBuilder);
        }
        eventBuilder.addChildFlowFile(child);
    }

    private void registerJoinEvent(FlowFile child, Collection<FlowFile> parents) {
        ProvenanceEventRecord eventRecord = this.provenanceReporter.generateJoinEvent(parents, child);
        List existingRecords = this.generatedProvenanceEvents.computeIfAbsent(child, k -> new ArrayList());
        existingRecords.add(eventRecord);
    }

    public FlowFile penalize(FlowFile flowFile) {
        this.verifyTaskActive();
        flowFile = this.validateRecordState(flowFile, false);
        return this.penalize(flowFile, this.context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
    }

    public FlowFile penalize(FlowFile flowFile, long period, TimeUnit timeUnit) {
        flowFile = this.getRecord(flowFile).getCurrent();
        StandardRepositoryRecord record = this.getRecord(flowFile);
        long penalizeMillis = TimeUnit.MILLISECONDS.convert(period, timeUnit);
        long expirationEpochMillis = System.currentTimeMillis() + penalizeMillis;
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build();
        record.setWorking(newFile, false);
        return newFile;
    }

    public FlowFile putAttribute(FlowFile flowFile, String key, String value) {
        this.verifyTaskActive();
        flowFile = this.validateRecordState(flowFile);
        if (CoreAttributes.UUID.key().equals(key)) {
            return flowFile;
        }
        StandardRepositoryRecord record = this.getRecord(flowFile);
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttribute(key, value).build();
        record.setWorking(newFile, key, value, false);
        return newFile;
    }

    public FlowFile putAllAttributes(FlowFile flowFile, Map<String, String> attributes) {
        Map<String, String> updatedAttributes;
        this.verifyTaskActive();
        if (attributes.isEmpty()) {
            return flowFile;
        }
        flowFile = this.validateRecordState(flowFile);
        StandardRepositoryRecord record = this.getRecord(flowFile);
        if (attributes.containsKey(CoreAttributes.UUID.key())) {
            updatedAttributes = new HashMap<String, String>(attributes);
            updatedAttributes.remove(CoreAttributes.UUID.key());
        } else {
            updatedAttributes = attributes;
        }
        StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttributes(updatedAttributes);
        FlowFileRecord newFile = ffBuilder.build();
        record.setWorking(newFile, updatedAttributes, false);
        return newFile;
    }

    public FlowFile removeAttribute(FlowFile flowFile, String key) {
        this.verifyTaskActive();
        flowFile = this.validateRecordState(flowFile);
        if (REQUIRED_ATTRIBUTES.contains(key)) {
            return flowFile;
        }
        StandardRepositoryRecord record = this.getRecord(flowFile);
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(new String[]{key}).build();
        record.setWorking(newFile, key, null, false);
        return newFile;
    }

    public FlowFile removeAllAttributes(FlowFile flowFile, Set<String> keys) {
        this.verifyTaskActive();
        flowFile = this.validateRecordState(flowFile);
        if (keys == null) {
            return flowFile;
        }
        StandardRepositoryRecord record = this.getRecord(flowFile);
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keys).build();
        HashMap<String, Object> updatedAttrs = new HashMap<String, Object>();
        for (String key : keys) {
            if (REQUIRED_ATTRIBUTES.contains(key) || CoreAttributes.UUID.key().equals(key)) continue;
            updatedAttrs.put(key, null);
        }
        record.setWorking(newFile, updatedAttrs, false);
        return newFile;
    }

    public FlowFile removeAllAttributes(FlowFile flowFile, Pattern keyPattern) {
        this.verifyTaskActive();
        flowFile = this.validateRecordState(flowFile);
        StandardRepositoryRecord record = this.getRecord(flowFile);
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keyPattern).build();
        if (keyPattern == null) {
            record.setWorking(newFile, false);
        } else {
            Map curAttrs = record.getCurrent().getAttributes();
            HashMap<String, Object> removed = new HashMap<String, Object>();
            for (String key : curAttrs.keySet()) {
                if (REQUIRED_ATTRIBUTES.contains(key) || !keyPattern.matcher(key).matches()) continue;
                removed.put(key, null);
            }
            record.setWorking(newFile, removed, false);
        }
        return newFile;
    }

    private void updateLastQueuedDate(StandardRepositoryRecord record, Long lastQueueDate) {
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).lastQueued(lastQueueDate.longValue(), enqueuedIndex.getAndIncrement()).build();
        record.setWorking(newFile, false);
    }

    private void updateLastQueuedDate(StandardRepositoryRecord record) {
        this.updateLastQueuedDate(record, System.currentTimeMillis());
    }

    public void transfer(FlowFile flowFile, Relationship relationship) {
        this.verifyTaskActive();
        flowFile = this.validateRecordState(flowFile);
        int numDestinations = this.context.getConnections(relationship).size();
        int multiplier = Math.max(1, numDestinations);
        boolean autoTerminated = false;
        boolean selfRelationship = false;
        if (numDestinations == 0 && this.context.getConnectable().isAutoTerminated(relationship)) {
            autoTerminated = true;
        } else if (numDestinations == 0 && relationship == Relationship.SELF) {
            selfRelationship = true;
        } else if (numDestinations == 0) {
            throw new IllegalArgumentException("Relationship '" + relationship.getName() + "' is not known");
        }
        StandardRepositoryRecord record = this.getRecord(flowFile);
        record.setTransferRelationship(relationship);
        this.updateLastQueuedDate(record);
        if (autoTerminated) {
            this.removedCount += multiplier;
            this.removedBytes += flowFile.getSize();
        } else if (!selfRelationship) {
            this.flowFilesOut += multiplier;
            this.contentSizeOut += flowFile.getSize() * (long)multiplier;
        }
    }

    public void transfer(FlowFile flowFile) {
        this.verifyTaskActive();
        flowFile = this.validateRecordState(flowFile);
        StandardRepositoryRecord record = this.getRecord(flowFile);
        if (record.getOriginalQueue() == null) {
            throw new IllegalArgumentException("Cannot transfer FlowFiles that are created in this Session back to self");
        }
        record.setTransferRelationship(Relationship.SELF);
        this.updateLastQueuedDate(record);
    }

    public void transfer(Collection<FlowFile> flowFiles) {
        for (FlowFile flowFile : flowFiles) {
            this.transfer(flowFile);
        }
    }

    public void transfer(Collection<FlowFile> flowFiles, Relationship relationship) {
        this.verifyTaskActive();
        flowFiles = this.validateRecordState(flowFiles);
        boolean autoTerminated = false;
        boolean selfRelationship = false;
        int numDestinations = this.context.getConnections(relationship).size();
        if (numDestinations == 0 && this.context.getConnectable().isAutoTerminated(relationship)) {
            autoTerminated = true;
        } else if (numDestinations == 0 && relationship == Relationship.SELF) {
            selfRelationship = true;
        } else if (numDestinations == 0) {
            throw new IllegalArgumentException("Relationship '" + relationship.getName() + "' is not known");
        }
        int multiplier = Math.max(1, numDestinations);
        long queuedTime = System.currentTimeMillis();
        long contentSize = 0L;
        for (FlowFile flowFile : flowFiles) {
            FlowFileRecord flowFileRecord = (FlowFileRecord)flowFile;
            StandardRepositoryRecord record = this.getRecord((FlowFile)flowFileRecord);
            record.setTransferRelationship(relationship);
            this.updateLastQueuedDate(record, queuedTime);
            contentSize += flowFile.getSize();
        }
        if (autoTerminated) {
            this.removedCount += multiplier * flowFiles.size();
            this.removedBytes += (long)multiplier * contentSize;
        } else if (!selfRelationship) {
            this.flowFilesOut += multiplier * flowFiles.size();
            this.contentSizeOut += (long)multiplier * contentSize;
        }
    }

    public void remove(FlowFile flowFile) {
        this.verifyTaskActive();
        flowFile = this.validateRecordState(flowFile);
        StandardRepositoryRecord record = this.getRecord(flowFile);
        record.markForDelete();
        this.removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
        if (record.getOriginalQueue() == null) {
            this.generatedProvenanceEvents.remove(flowFile);
            this.removeForkEvents(flowFile);
        } else {
            ++this.removedCount;
            this.removedBytes += flowFile.getSize();
            this.provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key()));
        }
    }

    public void remove(Collection<FlowFile> flowFiles) {
        this.verifyTaskActive();
        flowFiles = this.validateRecordState(flowFiles);
        for (FlowFile flowFile : flowFiles) {
            StandardRepositoryRecord record = this.getRecord(flowFile);
            record.markForDelete();
            this.removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
            if (record.getOriginalQueue() == null) {
                this.generatedProvenanceEvents.remove(flowFile);
                this.removeForkEvents(flowFile);
                continue;
            }
            ++this.removedCount;
            this.removedBytes += flowFile.getSize();
            this.provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key()));
        }
    }

    private void removeForkEvents(FlowFile flowFile) {
        for (ProvenanceEventBuilder builder : this.forkEventBuilders.values()) {
            ProvenanceEventRecord event = builder.build();
            if (event.getEventType() != ProvenanceEventType.FORK) continue;
            builder.removeChildFlowFile(flowFile);
        }
    }

    public void expireFlowFiles() {
        HashSet<FlowFileRecord> expired = new HashSet<FlowFileRecord>();
        FlowFileFilter filter = flowFile -> FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
        for (Connection conn : this.context.getConnectable().getIncomingConnections()) {
            do {
                expired.clear();
                conn.getFlowFileQueue().poll(filter, expired, PollStrategy.ALL_FLOWFILES);
                this.removeExpired(expired, conn);
            } while (!expired.isEmpty());
        }
    }

    private void removeExpired(Set<FlowFileRecord> flowFiles, Connection connection) {
        if (flowFiles.isEmpty()) {
            return;
        }
        LOG.info("{} {} FlowFiles have expired and will be removed", (Object)this, (Object)flowFiles.size());
        ArrayList<StandardRepositoryRecord> expiredRecords = new ArrayList<StandardRepositoryRecord>(flowFiles.size());
        Connectable connectable = this.context.getConnectable();
        InternalProvenanceReporter expiredReporter = this.context.createProvenanceReporter(this::isFlowFileKnown, this);
        final HashMap<String, FlowFileRecord> recordIdMap = new HashMap<String, FlowFileRecord>();
        for (FlowFileRecord flowFile : flowFiles) {
            recordIdMap.put(flowFile.getAttribute(CoreAttributes.UUID.key()), flowFile);
            StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile);
            record.markForDelete();
            expiredRecords.add(record);
            expiredReporter.expire((FlowFile)flowFile, "Expiration Threshold = " + connection.getFlowFileQueue().getFlowFileExpiration());
            long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
            Connectable terminator = connectable instanceof ProcessorNode ? ((ProcessorNode)connectable).getProcessor() : connectable;
            LOG.debug("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
        }
        try {
            Iterable iterable = () -> {
                final Iterator expiredEventIterator = expiredReporter.getEvents().iterator();
                Iterator<ProvenanceEventRecord> enrichingIterator = new Iterator<ProvenanceEventRecord>(){

                    @Override
                    public boolean hasNext() {
                        return expiredEventIterator.hasNext();
                    }

                    @Override
                    public ProvenanceEventRecord next() {
                        ProvenanceEventRecord event = (ProvenanceEventRecord)expiredEventIterator.next();
                        ProvenanceEventBuilder enriched = StandardProcessSession.this.context.createProvenanceEventBuilder().fromEvent(event);
                        FlowFileRecord record = (FlowFileRecord)recordIdMap.get(event.getFlowFileUuid());
                        if (record == null) {
                            return null;
                        }
                        ContentClaim claim = record.getContentClaim();
                        if (claim != null) {
                            ResourceClaim resourceClaim = claim.getResourceClaim();
                            enriched.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(record.getContentClaimOffset() + claim.getOffset()), record.getSize());
                            enriched.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(record.getContentClaimOffset() + claim.getOffset()), record.getSize());
                        }
                        enriched.setAttributes(record.getAttributes(), Collections.emptyMap());
                        return enriched.build();
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
                return enrichingIterator;
            };
            this.context.getProvenanceRepository().registerEvents(iterable);
            this.context.getFlowFileRepository().updateRepository(expiredRecords);
        }
        catch (IOException e) {
            LOG.error("Failed to update FlowFile Repository to record expired records", (Throwable)e);
        }
    }

    private InputStream getInputStream(FlowFile flowFile, ContentClaim claim, long contentClaimOffset, boolean allowCachingOfStream) throws ContentNotFoundException {
        if (flowFile.getSize() == 0L) {
            return new ByteArrayInputStream(new byte[0]);
        }
        try {
            if (allowCachingOfStream && this.readRecursionSet.isEmpty() && !this.writeRecursionSet.contains(flowFile) && this.context.getContentRepository().isResourceClaimStreamSupported()) {
                ByteCountingInputStream byteCountingInputStream;
                if (this.currentReadClaim == claim.getResourceClaim()) {
                    long resourceClaimOffset = claim.getOffset() + contentClaimOffset;
                    if (this.currentReadClaimStream != null && this.currentReadClaimStream.getBytesConsumed() <= resourceClaimOffset) {
                        long bytesToSkip = resourceClaimOffset - this.currentReadClaimStream.getBytesConsumed();
                        if (bytesToSkip > 0L) {
                            StreamUtils.skip((InputStream)this.currentReadClaimStream, (long)bytesToSkip);
                        }
                        LimitingInputStream limitingInputStream = new LimitingInputStream((InputStream)new DisableOnCloseInputStream((InputStream)this.currentReadClaimStream), flowFile.getSize());
                        ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(this.context.getContentRepository(), claim, contentClaimOffset, (InputStream)limitingInputStream, this.performanceTracker);
                        return contentClaimInputStream;
                    }
                }
                this.claimCache.flush(claim);
                if (this.currentReadClaimStream != null) {
                    this.currentReadClaimStream.close();
                }
                this.currentReadClaim = claim.getResourceClaim();
                this.performanceTracker.beginContentRead();
                InputStream contentRepoStream = this.context.getContentRepository().read(claim.getResourceClaim());
                this.performanceTracker.endContentRead();
                PerformanceTrackingInputStream performanceTrackInputStream = new PerformanceTrackingInputStream(contentRepoStream, this.performanceTracker);
                StreamUtils.skip((InputStream)performanceTrackInputStream, (long)(claim.getOffset() + contentClaimOffset));
                BufferedInputStream bufferedContentStream = new BufferedInputStream(contentRepoStream);
                this.currentReadClaimStream = byteCountingInputStream = new ByteCountingInputStream((InputStream)bufferedContentStream, claim.getOffset() + contentClaimOffset);
                LimitingInputStream limitingInputStream = new LimitingInputStream((InputStream)new DisableOnCloseInputStream((InputStream)this.currentReadClaimStream), flowFile.getSize());
                ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(this.context.getContentRepository(), claim, contentClaimOffset, (InputStream)limitingInputStream, this.performanceTracker);
                return contentClaimInputStream;
            }
            this.claimCache.flush(claim);
            ContentClaimInputStream rawInStream = new ContentClaimInputStream(this.context.getContentRepository(), claim, contentClaimOffset, this.performanceTracker);
            return rawInStream;
        }
        catch (ContentNotFoundException cnfe) {
            throw cnfe;
        }
        catch (EOFException eof) {
            throw new ContentNotFoundException(claim, (Throwable)eof);
        }
        catch (IOException ioe) {
            throw new FlowFileAccessException("Failed to read content of " + String.valueOf(flowFile), (Throwable)ioe);
        }
    }

    public void read(FlowFile source, InputStreamCallback reader) {
        this.verifyTaskActive();
        source = this.validateRecordState(source, true);
        StandardRepositoryRecord record = this.getRecord(source);
        try {
            this.ensureNotAppending(record.getCurrentClaim());
            this.claimCache.flush(record.getCurrentClaim());
        }
        catch (IOException e) {
            throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), (Throwable)e);
        }
        try (InputStream rawIn = this.getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), true);
             LimitedInputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
             DisableOnCloseInputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
             ByteCountingInputStream countingStream = new ByteCountingInputStream((InputStream)disableOnCloseIn, this.bytesRead);){
            FlowFileAccessInputStream ffais = new FlowFileAccessInputStream((InputStream)countingStream, source, record.getCurrentClaim());
            boolean cnfeThrown = false;
            try {
                this.incrementReadCount(source);
                reader.process(this.createTaskTerminationStream(ffais));
            }
            catch (ContentNotFoundException cnfe) {
                cnfeThrown = true;
                throw cnfe;
            }
            finally {
                this.decrementReadCount(source);
                this.bytesRead += countingStream.getBytesRead();
                if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
                    throw ffais.getContentNotFoundException();
                }
            }
        }
        catch (ContentNotFoundException nfe) {
            this.handleContentNotFound(nfe, record);
        }
        catch (IOException ex) {
            throw new ProcessException("IOException thrown from " + this.connectableDescription + ": " + ex.toString(), (Throwable)ex);
        }
    }

    public InputStream read(FlowFile source) {
        InputStream rawIn;
        this.verifyTaskActive();
        source = this.validateRecordState(source, true);
        final StandardRepositoryRecord record = this.getRecord(source);
        try {
            ContentClaim currentClaim = record.getCurrentClaim();
            this.ensureNotAppending(currentClaim);
            this.claimCache.flush(currentClaim);
        }
        catch (IOException e) {
            throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), (Throwable)e);
        }
        try {
            rawIn = this.getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), true);
        }
        catch (ContentNotFoundException nfe) {
            this.handleContentNotFound(nfe, record);
            throw nfe;
        }
        LimitedInputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
        final ByteCountingInputStream countingStream = new ByteCountingInputStream((InputStream)limitedIn);
        final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream((InputStream)countingStream, source, record.getCurrentClaim());
        final FlowFile sourceFlowFile = source;
        InputStream errorHandlingStream = new InputStream(){
            private boolean closed = false;

            @Override
            public int read() throws IOException {
                try {
                    return ffais.read();
                }
                catch (ContentNotFoundException cnfe) {
                    this.close();
                    StandardProcessSession.this.handleContentNotFound(cnfe, record);
                    throw cnfe;
                }
                catch (FlowFileAccessException ffae) {
                    LOG.error("Failed to read content from {}; rolling back session", (Object)sourceFlowFile, (Object)ffae);
                    this.close();
                    StandardProcessSession.this.rollback(true);
                    throw ffae;
                }
            }

            @Override
            public int read(byte[] b) throws IOException {
                return this.read(b, 0, b.length);
            }

            @Override
            public int read(byte[] b, int off, int len) throws IOException {
                try {
                    return ffais.read(b, off, len);
                }
                catch (ContentNotFoundException cnfe) {
                    this.close();
                    StandardProcessSession.this.handleContentNotFound(cnfe, record);
                    throw cnfe;
                }
                catch (FlowFileAccessException ffae) {
                    LOG.error("Failed to read content from {}; rolling back session", (Object)sourceFlowFile, (Object)ffae);
                    this.close();
                    StandardProcessSession.this.rollback(true);
                    throw ffae;
                }
            }

            @Override
            public void close() throws IOException {
                StandardProcessSession.this.decrementReadCount(sourceFlowFile);
                if (!this.closed) {
                    StandardProcessSession.this.bytesRead += countingStream.getBytesRead();
                    this.closed = true;
                }
                ffais.close();
                StandardProcessSession.this.openInputStreams.remove(sourceFlowFile);
            }

            @Override
            public int available() throws IOException {
                return ffais.available();
            }

            @Override
            public long skip(long n) throws IOException {
                return ffais.skip(n);
            }

            @Override
            public boolean markSupported() {
                return ffais.markSupported();
            }

            @Override
            public synchronized void mark(int readlimit) {
                ffais.mark(readlimit);
            }

            @Override
            public synchronized void reset() throws IOException {
                ffais.reset();
            }

            public String toString() {
                return "ErrorHandlingInputStream[FlowFile=" + String.valueOf(sourceFlowFile) + "]";
            }
        };
        this.incrementReadCount(sourceFlowFile);
        this.openInputStreams.put(sourceFlowFile, errorHandlingStream);
        return this.createTaskTerminationStream(errorHandlingStream);
    }

    private InputStream createTaskTerminationStream(InputStream delegate) {
        return new TaskTerminationInputStream(delegate, this.taskTermination, () -> this.rollback(false, true));
    }

    private OutputStream createTaskTerminationStream(OutputStream delegate) {
        return new TaskTerminationOutputStream(delegate, this.taskTermination, () -> this.rollback(false, true));
    }

    private void incrementReadCount(FlowFile flowFile) {
        this.readRecursionSet.compute(flowFile, (ff, count) -> count == null ? 1 : count + 1);
    }

    private void decrementReadCount(FlowFile flowFile) {
        Integer count = this.readRecursionSet.get(flowFile);
        if (count == null) {
            return;
        }
        int updatedCount = count - 1;
        if (updatedCount == 0) {
            this.readRecursionSet.remove(flowFile);
        } else {
            this.readRecursionSet.put(flowFile, updatedCount);
        }
    }

    public FlowFile merge(Collection<FlowFile> sources, FlowFile destination) {
        this.verifyTaskActive();
        return this.merge(sources, destination, null, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FlowFile merge(Collection<FlowFile> sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator) {
        ContentClaim newClaim;
        this.verifyTaskActive();
        sources = this.validateRecordState(sources);
        destination = this.validateRecordState(destination);
        if (sources.contains(destination)) {
            throw new IllegalArgumentException("Destination cannot be within sources");
        }
        ArrayList<StandardRepositoryRecord> sourceRecords = new ArrayList<StandardRepositoryRecord>();
        for (FlowFile source : sources) {
            StandardRepositoryRecord record = this.getRecord(source);
            sourceRecords.add(record);
            try {
                this.ensureNotAppending(record.getCurrentClaim());
                this.claimCache.flush(record.getCurrentClaim());
            }
            catch (IOException e) {
                throw new FlowFileAccessException("Unable to read from source " + String.valueOf(source) + " due to " + e.toString(), (Throwable)e);
            }
        }
        StandardRepositoryRecord destinationRecord = this.getRecord(destination);
        ContentRepository contentRepo = this.context.getContentRepository();
        try {
            newClaim = contentRepo.create(this.context.getConnectable().isLossTolerant());
            claimLog.debug("Creating ContentClaim {} for 'merge' for {}", (Object)newClaim, (Object)destinationRecord.getCurrent());
        }
        catch (IOException e) {
            throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), (Throwable)e);
        }
        long readCount = 0L;
        long writtenCount = 0L;
        try {
            try (OutputStream rawOut = contentRepo.write(newClaim);
                 BufferedOutputStream out = new BufferedOutputStream(rawOut);){
                if (header != null && header.length > 0) {
                    ((OutputStream)out).write(header);
                    writtenCount += (long)header.length;
                }
                int objectIndex = 0;
                boolean useDemarcator = demarcator != null && demarcator.length > 0;
                int numSources = sources.size();
                for (FlowFile source : sources) {
                    StandardRepositoryRecord sourceRecord = this.getRecord(source);
                    long copied = contentRepo.exportTo(sourceRecord.getCurrentClaim(), (OutputStream)out, sourceRecord.getCurrentClaimOffset(), source.getSize());
                    writtenCount += copied;
                    readCount += copied;
                    if (!useDemarcator || ++objectIndex >= numSources) continue;
                    ((OutputStream)out).write(demarcator);
                    writtenCount += (long)demarcator.length;
                }
                if (footer != null && footer.length > 0) {
                    ((OutputStream)out).write(footer);
                    writtenCount += (long)footer.length;
                }
            }
            finally {
                this.bytesWritten += writtenCount;
                this.bytesRead += readCount;
            }
        }
        catch (ContentNotFoundException nfe) {
            this.destroyContent(newClaim, destinationRecord);
            this.handleContentNotFound(nfe, destinationRecord);
            this.handleContentNotFound(nfe, sourceRecords);
        }
        catch (IOException ioe) {
            this.destroyContent(newClaim, destinationRecord);
            throw new FlowFileAccessException("Failed to merge " + sources.size() + " into " + String.valueOf(destination) + " due to " + ioe.toString(), (Throwable)ioe);
        }
        catch (Throwable t) {
            this.destroyContent(newClaim, destinationRecord);
            throw t;
        }
        this.removeTemporaryClaim(destinationRecord);
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(destinationRecord.getCurrent()).contentClaim(newClaim).contentClaimOffset(0L).size(writtenCount).build();
        destinationRecord.setWorking(newFile, true);
        return newFile;
    }

    private void ensureNotAppending(ContentClaim claim) throws IOException {
        if (claim == null) {
            return;
        }
        ByteCountingOutputStream outStream = this.appendableStreams.remove(claim);
        if (outStream == null) {
            return;
        }
        outStream.flush();
        outStream.close();
    }

    public OutputStream write(FlowFile source) {
        this.verifyTaskActive();
        source = this.validateRecordState(source);
        final StandardRepositoryRecord record = this.getRecord(source);
        ContentClaim newClaim = null;
        try {
            newClaim = this.claimCache.getContentClaim();
            claimLog.debug("Creating ContentClaim {} for 'write' for {}", (Object)newClaim, (Object)source);
            this.ensureNotAppending(newClaim);
            final OutputStream rawStream = this.claimCache.write(newClaim);
            NonFlushableOutputStream nonFlushable = new NonFlushableOutputStream(rawStream);
            DisableOnCloseOutputStream disableOnClose = new DisableOnCloseOutputStream((OutputStream)nonFlushable);
            final ByteCountingOutputStream countingOut = new ByteCountingOutputStream((OutputStream)disableOnClose);
            final FlowFile sourceFlowFile = source;
            final ContentClaim updatedClaim = newClaim;
            OutputStream errorHandlingOutputStream = new OutputStream(){
                private boolean closed = false;

                @Override
                public void write(int b) throws IOException {
                    try {
                        countingOut.write(b);
                    }
                    catch (IOException ioe) {
                        LOG.error("Failed to write content to {}; rolling back session", (Object)sourceFlowFile, (Object)ioe);
                        StandardProcessSession.this.rollback(true);
                        this.close();
                        throw new FlowFileAccessException("Failed to write to Content Repository for " + String.valueOf(sourceFlowFile), (Throwable)ioe);
                    }
                }

                @Override
                public void write(byte[] b) throws IOException {
                    try {
                        countingOut.write(b);
                    }
                    catch (IOException ioe) {
                        LOG.error("Failed to write content to {}; rolling back session", (Object)sourceFlowFile, (Object)ioe);
                        StandardProcessSession.this.rollback(true);
                        this.close();
                        throw new FlowFileAccessException("Failed to write to Content Repository for " + String.valueOf(sourceFlowFile), (Throwable)ioe);
                    }
                }

                @Override
                public void write(byte[] b, int off, int len) throws IOException {
                    try {
                        countingOut.write(b, off, len);
                    }
                    catch (IOException ioe) {
                        LOG.error("Failed to write content to {}; rolling back session", (Object)sourceFlowFile, (Object)ioe);
                        StandardProcessSession.this.rollback(true);
                        this.close();
                        throw new FlowFileAccessException("Failed to write to Content Repository for " + String.valueOf(sourceFlowFile), (Throwable)ioe);
                    }
                }

                @Override
                public void flush() throws IOException {
                    try {
                        countingOut.flush();
                    }
                    catch (IOException ioe) {
                        LOG.error("Failed to write content to {}; rolling back session", (Object)sourceFlowFile, (Object)ioe);
                        StandardProcessSession.this.rollback(true);
                        this.close();
                        throw new FlowFileAccessException("Failed to write to Content Repository for " + String.valueOf(sourceFlowFile), (Throwable)ioe);
                    }
                }

                @Override
                public void close() throws IOException {
                    FlowFileRecord newFile;
                    if (this.closed) {
                        return;
                    }
                    this.closed = true;
                    countingOut.close();
                    rawStream.close();
                    StandardProcessSession.this.writeRecursionSet.remove(sourceFlowFile);
                    long bytesWritten = countingOut.getBytesWritten();
                    StandardProcessSession.this.bytesWritten += bytesWritten;
                    OutputStream removed = StandardProcessSession.this.openOutputStreams.remove(sourceFlowFile);
                    if (removed == null) {
                        LOG.error("Closed Session's OutputStream but there was no entry for it in the map; sourceFlowFile={}; map={}", (Object)sourceFlowFile, StandardProcessSession.this.openOutputStreams);
                    }
                    this.flush();
                    StandardProcessSession.this.removeTemporaryClaim(record);
                    if (bytesWritten == 0L) {
                        newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(null).contentClaimOffset(0L).size(bytesWritten).build();
                        StandardProcessSession.this.context.getContentRepository().decrementClaimantCount(updatedClaim);
                        record.addTransientClaim(updatedClaim);
                    } else {
                        newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(updatedClaim).contentClaimOffset(Math.max(0L, updatedClaim.getLength() - bytesWritten)).size(bytesWritten).build();
                    }
                    record.setWorking(newFile, true);
                }
            };
            this.writeRecursionSet.add(source);
            this.openOutputStreams.put(source, errorHandlingOutputStream);
            return this.createTaskTerminationStream(errorHandlingOutputStream);
        }
        catch (ContentNotFoundException nfe) {
            this.resetWriteClaims();
            this.destroyContent(newClaim, record);
            this.handleContentNotFound(nfe, record);
            throw nfe;
        }
        catch (IOException ioe) {
            throw new ProcessException("IOException thrown from " + this.connectableDescription + ": " + ioe.toString(), (Throwable)ioe);
        }
        finally {
            this.resetWriteClaims();
            this.destroyContent(newClaim, record);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FlowFile write(FlowFile source, OutputStreamCallback writer) {
        FlowFileRecord newFile;
        this.verifyTaskActive();
        source = this.validateRecordState(source);
        StandardRepositoryRecord record = this.getRecord(source);
        long writtenToFlowFile = 0L;
        ContentClaim newClaim = null;
        try {
            newClaim = this.claimCache.getContentClaim();
            claimLog.debug("Creating ContentClaim {} for 'write' for {}", (Object)newClaim, (Object)source);
            this.ensureNotAppending(newClaim);
            try (OutputStream stream = this.claimCache.write(newClaim);
                 NonFlushableOutputStream nonFlushableOutputStream = new NonFlushableOutputStream(stream);
                 DisableOnCloseOutputStream disableOnClose = new DisableOnCloseOutputStream((OutputStream)nonFlushableOutputStream);
                 ByteCountingOutputStream countingOut = new ByteCountingOutputStream((OutputStream)disableOnClose);){
                try {
                    this.writeRecursionSet.add(source);
                    FlowFileAccessOutputStream ffaos = new FlowFileAccessOutputStream((OutputStream)countingOut, source);
                    writer.process(this.createTaskTerminationStream(ffaos));
                }
                finally {
                    writtenToFlowFile = countingOut.getBytesWritten();
                    this.bytesWritten += countingOut.getBytesWritten();
                }
            }
            finally {
                this.writeRecursionSet.remove(source);
            }
        }
        catch (ContentNotFoundException nfe) {
            this.resetWriteClaims();
            this.destroyContent(newClaim, record);
            this.handleContentNotFound(nfe, record);
        }
        catch (IOException ioe) {
            this.resetWriteClaims();
            this.destroyContent(newClaim, record);
            throw new ProcessException("IOException thrown from " + this.connectableDescription + ": " + ioe.toString(), (Throwable)ioe);
        }
        catch (Throwable t) {
            this.resetWriteClaims();
            this.destroyContent(newClaim, record);
            throw t;
        }
        this.removeTemporaryClaim(record);
        if (writtenToFlowFile == 0L) {
            newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(null).contentClaimOffset(0L).size(0L).build();
            this.context.getContentRepository().decrementClaimantCount(newClaim);
            record.addTransientClaim(newClaim);
        } else {
            newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(Math.max(0L, newClaim.getLength() - writtenToFlowFile)).size(writtenToFlowFile).build();
        }
        record.setWorking(newFile, true);
        return newFile;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FlowFile append(FlowFile source, OutputStreamCallback writer) {
        FlowFileRecord newFile;
        this.verifyTaskActive();
        source = this.validateRecordState(source);
        StandardRepositoryRecord record = this.getRecord(source);
        long newSize = 0L;
        ContentClaim oldClaim = record.getCurrentClaim();
        ByteCountingOutputStream outStream = oldClaim == null ? null : this.appendableStreams.get(oldClaim);
        long originalByteWrittenCount = 0L;
        ContentClaim newClaim = null;
        try {
            block43: {
                if (outStream == null) {
                    this.claimCache.flush(oldClaim);
                    try (InputStream oldClaimIn = this.read(source);){
                        newClaim = this.context.getContentRepository().create(this.context.getConnectable().isLossTolerant());
                        claimLog.debug("Creating ContentClaim {} for 'append' for {}", (Object)newClaim, (Object)source);
                        OutputStream rawOutStream = this.context.getContentRepository().write(newClaim);
                        BufferedOutputStream bufferedOutStream = new BufferedOutputStream(rawOutStream);
                        outStream = new ByteCountingOutputStream((OutputStream)bufferedOutStream);
                        originalByteWrittenCount = 0L;
                        this.appendableStreams.put(newClaim, outStream);
                        StreamUtils.copy((InputStream)oldClaimIn, (OutputStream)outStream);
                        NonFlushableOutputStream nonFlushable = new NonFlushableOutputStream((OutputStream)outStream);
                        try (DisableOnCloseOutputStream disableOnClose = new DisableOnCloseOutputStream((OutputStream)nonFlushable);){
                            this.writeRecursionSet.add(source);
                            writer.process((OutputStream)new FlowFileAccessOutputStream(disableOnClose, source));
                            break block43;
                        }
                        finally {
                            this.writeRecursionSet.remove(source);
                        }
                    }
                }
                newClaim = oldClaim;
                originalByteWrittenCount = outStream.getBytesWritten();
                NonFlushableOutputStream nonFlushable = new NonFlushableOutputStream((OutputStream)outStream);
                try (DisableOnCloseOutputStream disableOnClose = new DisableOnCloseOutputStream((OutputStream)nonFlushable);
                     FlowFileAccessOutputStream flowFileAccessOutStream = new FlowFileAccessOutputStream(disableOnClose, source);){
                    this.writeRecursionSet.add(source);
                    writer.process((OutputStream)flowFileAccessOutStream);
                }
                finally {
                    this.writeRecursionSet.remove(source);
                }
            }
            newSize = outStream.getBytesWritten();
        }
        catch (ContentNotFoundException nfe) {
            this.resetWriteClaims();
            if (newClaim != oldClaim) {
                this.destroyContent(newClaim, record);
            }
            this.handleContentNotFound(nfe, record);
        }
        catch (IOException ioe) {
            this.resetWriteClaims();
            if (newClaim != oldClaim) {
                this.destroyContent(newClaim, record);
            }
            throw new ProcessException("IOException thrown from " + this.connectableDescription + ": " + ioe.toString(), (Throwable)ioe);
        }
        catch (Throwable t) {
            this.resetWriteClaims();
            if (newClaim != oldClaim) {
                this.destroyContent(newClaim, record);
            }
            throw t;
        }
        finally {
            if (outStream != null) {
                long bytesWrittenThisIteration = outStream.getBytesWritten() - originalByteWrittenCount;
                this.bytesWritten += bytesWrittenThisIteration;
            }
        }
        if (newClaim != oldClaim) {
            this.removeTemporaryClaim(record);
        }
        if (newSize == 0L) {
            newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(null).contentClaimOffset(0L).size(0L).build();
            this.context.getContentRepository().decrementClaimantCount(newClaim);
            record.addTransientClaim(newClaim);
        } else {
            newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(0L).size(newSize).build();
        }
        record.setWorking(newFile, true);
        return newFile;
    }

    private void removeTemporaryClaim(StandardRepositoryRecord record) {
        if (record.isContentModified() || record.getType() == RepositoryRecordType.CREATE) {
            this.context.getContentRepository().decrementClaimantCount(record.getWorkingClaim());
            record.addTransientClaim(record.getWorkingClaim());
        }
    }

    private void resetWriteClaims() {
        this.resetWriteClaims(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void resetWriteClaims(boolean suppressExceptions) {
        for (ByteCountingOutputStream out : this.appendableStreams.values()) {
            try {
                try {
                    out.flush();
                }
                finally {
                    out.close();
                }
            }
            catch (IOException e) {
                if (suppressExceptions) continue;
                throw new FlowFileAccessException("Unable to flush the output of FlowFile to the Content Repository");
            }
        }
        this.appendableStreams.clear();
    }

    private void resetReadClaim() {
        try {
            if (this.currentReadClaimStream != null) {
                this.currentReadClaimStream.close();
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.currentReadClaimStream = null;
        this.currentReadClaim = null;
    }

    public FlowFile write(FlowFile source, StreamCallback writer) {
        FlowFileRecord newFile;
        this.verifyTaskActive();
        source = this.validateRecordState(source);
        StandardRepositoryRecord record = this.getRecord(source);
        ContentClaim currClaim = record.getCurrentClaim();
        long writtenToFlowFile = 0L;
        ContentClaim newClaim = null;
        try {
            newClaim = this.claimCache.getContentClaim();
            claimLog.debug("Creating ContentClaim {} for 'write' for {}", (Object)newClaim, (Object)source);
            this.ensureNotAppending(newClaim);
            if (currClaim != null) {
                this.claimCache.flush(currClaim.getResourceClaim());
            }
            try (InputStream is = this.getInputStream(source, currClaim, record.getCurrentClaimOffset(), true);
                 LimitedInputStream limitedIn = new LimitedInputStream(is, source.getSize());
                 DisableOnCloseInputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
                 ByteCountingInputStream countingIn = new ByteCountingInputStream((InputStream)disableOnCloseIn, this.bytesRead);
                 OutputStream os = this.claimCache.write(newClaim);
                 NonFlushableOutputStream nonFlushableOut = new NonFlushableOutputStream(os);
                 DisableOnCloseOutputStream disableOnCloseOut = new DisableOnCloseOutputStream((OutputStream)nonFlushableOut);
                 ByteCountingOutputStream countingOut = new ByteCountingOutputStream((OutputStream)disableOnCloseOut);){
                this.writeRecursionSet.add(source);
                FlowFileAccessInputStream ffais = new FlowFileAccessInputStream((InputStream)countingIn, source, currClaim);
                FlowFileAccessOutputStream ffaos = new FlowFileAccessOutputStream((OutputStream)countingOut, source);
                boolean cnfeThrown = false;
                try {
                    writer.process(this.createTaskTerminationStream(ffais), this.createTaskTerminationStream(ffaos));
                }
                catch (ContentNotFoundException cnfe) {
                    cnfeThrown = true;
                    throw cnfe;
                }
                finally {
                    writtenToFlowFile = countingOut.getBytesWritten();
                    this.bytesWritten += writtenToFlowFile;
                    this.bytesRead += countingIn.getBytesRead();
                    this.writeRecursionSet.remove(source);
                    if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
                        throw ffais.getContentNotFoundException();
                    }
                }
            }
        }
        catch (ContentNotFoundException nfe) {
            this.destroyContent(newClaim, record);
            this.handleContentNotFound(nfe, record);
        }
        catch (IOException ioe) {
            this.destroyContent(newClaim, record);
            throw new ProcessException("IOException thrown from " + this.connectableDescription + ": " + ioe.toString(), (Throwable)ioe);
        }
        catch (Throwable t) {
            this.destroyContent(newClaim, record);
            throw t;
        }
        this.removeTemporaryClaim(record);
        if (writtenToFlowFile == 0L) {
            newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(null).contentClaimOffset(0L).size(0L).build();
            this.context.getContentRepository().decrementClaimantCount(newClaim);
            record.addTransientClaim(newClaim);
        } else {
            newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(Math.max(0L, newClaim.getLength() - writtenToFlowFile)).size(writtenToFlowFile).build();
        }
        record.setWorking(newFile, true);
        return newFile;
    }

    public FlowFile importFrom(Path source, boolean keepSourceFile, FlowFile destination) {
        FlowFileRecord newFile;
        long newSize;
        ContentClaim newClaim;
        this.verifyTaskActive();
        destination = this.validateRecordState(destination);
        if (!(keepSourceFile || Files.isWritable(source.getParent()) || source.getParent().toFile().canWrite())) {
            throw new FlowFileAccessException("Cannot write to path " + source.getParent().toFile().getAbsolutePath() + " so cannot delete file; will not import.");
        }
        StandardRepositoryRecord record = this.getRecord(destination);
        try {
            newClaim = this.context.getContentRepository().create(this.context.getConnectable().isLossTolerant());
            claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", (Object)newClaim, (Object)destination);
        }
        catch (IOException e) {
            throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), (Throwable)e);
        }
        long claimOffset = 0L;
        try {
            newSize = this.context.getContentRepository().importFrom(source, newClaim);
            this.bytesWritten += newSize;
            this.bytesRead += newSize;
        }
        catch (Throwable t) {
            this.destroyContent(newClaim, record);
            throw new FlowFileAccessException("Failed to import data from " + String.valueOf(source) + " for " + String.valueOf(destination) + " due to " + t.toString(), t);
        }
        this.removeTemporaryClaim(record);
        if (newSize == 0L) {
            newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(null).contentClaimOffset(0L).size(0L).addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName()).build();
            this.context.getContentRepository().decrementClaimantCount(newClaim);
            record.addTransientClaim(newClaim);
        } else {
            newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize).addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName()).build();
        }
        record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName(), true);
        if (!keepSourceFile) {
            this.deleteOnCommit.put((FlowFile)newFile, source);
        }
        return newFile;
    }

    public FlowFile importFrom(InputStream source, FlowFile destination) {
        FlowFileRecord newFile;
        long newSize;
        this.verifyTaskActive();
        destination = this.validateRecordState(destination);
        StandardRepositoryRecord record = this.getRecord(destination);
        ContentClaim newClaim = null;
        long claimOffset = 0L;
        try {
            try {
                newClaim = this.context.getContentRepository().create(this.context.getConnectable().isLossTolerant());
                claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", (Object)newClaim, (Object)destination);
                newSize = this.context.getContentRepository().importFrom(this.createTaskTerminationStream(source), newClaim);
                this.bytesWritten += newSize;
            }
            catch (IOException e) {
                throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), (Throwable)e);
            }
        }
        catch (Throwable t) {
            if (newClaim != null) {
                this.destroyContent(newClaim, record);
            }
            throw new FlowFileAccessException("Failed to import data from " + String.valueOf(source) + " for " + String.valueOf(destination) + " due to " + t.toString(), t);
        }
        this.removeTemporaryClaim(record);
        if (newSize == 0L) {
            newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(null).contentClaimOffset(0L).size(0L).build();
            this.context.getContentRepository().decrementClaimantCount(newClaim);
            record.addTransientClaim(newClaim);
        } else {
            newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(0L).size(newSize).build();
        }
        record.setWorking(newFile, true);
        return newFile;
    }

    public void exportTo(FlowFile source, Path destination, boolean append) {
        this.verifyTaskActive();
        source = this.validateRecordState(source);
        StandardRepositoryRecord record = this.getRecord(source);
        try {
            this.ensureNotAppending(record.getCurrentClaim());
            this.claimCache.flush(record.getCurrentClaim());
            long copyCount = this.context.getContentRepository().exportTo(record.getCurrentClaim(), destination, append, record.getCurrentClaimOffset(), source.getSize());
            this.bytesRead += copyCount;
        }
        catch (ContentNotFoundException nfe) {
            this.handleContentNotFound(nfe, record);
        }
        catch (Throwable t) {
            throw new FlowFileAccessException("Failed to export " + String.valueOf(source) + " to " + String.valueOf(destination) + " due to " + t.toString(), t);
        }
    }

    public void exportTo(FlowFile source, OutputStream destination) {
        this.verifyTaskActive();
        source = this.validateRecordState(source);
        StandardRepositoryRecord record = this.getRecord(source);
        if (record.getCurrentClaim() == null) {
            return;
        }
        try {
            this.ensureNotAppending(record.getCurrentClaim());
            this.claimCache.flush(record.getCurrentClaim());
        }
        catch (IOException e) {
            throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), (Throwable)e);
        }
        try (InputStream rawIn = this.getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), true);
             LimitedInputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
             DisableOnCloseInputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
             ByteCountingInputStream countingStream = new ByteCountingInputStream((InputStream)disableOnCloseIn, this.bytesRead);
             FlowFileAccessInputStream ffais = new FlowFileAccessInputStream((InputStream)countingStream, source, record.getCurrentClaim());){
            boolean cnfeThrown = false;
            try {
                this.incrementReadCount(source);
                StreamUtils.copy((InputStream)ffais, (OutputStream)this.createTaskTerminationStream(destination), (long)source.getSize());
            }
            catch (ContentNotFoundException cnfe) {
                cnfeThrown = true;
                throw cnfe;
            }
            finally {
                this.decrementReadCount(source);
                long streamBytesRead = countingStream.getBytesRead();
                this.bytesRead += streamBytesRead;
                if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
                    throw ffais.getContentNotFoundException();
                }
            }
        }
        catch (ContentNotFoundException nfe) {
            this.handleContentNotFound(nfe, record);
        }
        catch (IOException ex) {
            throw new ProcessException("IOException thrown from " + this.connectableDescription + ": " + ex.toString(), (Throwable)ex);
        }
    }

    private void handleContentNotFound(ContentNotFoundException nfe, Collection<StandardRepositoryRecord> suspectRecords) {
        for (StandardRepositoryRecord record : suspectRecords) {
            this.handleContentNotFound(nfe, record);
        }
    }

    private void handleContentNotFound(ContentNotFoundException nfe, StandardRepositoryRecord suspectRecord) {
        ContentClaim registeredClaim = suspectRecord.getOriginalClaim();
        ContentClaim transientClaim = suspectRecord.getWorkingClaim();
        ContentClaim missingClaim = nfe.getMissingClaim();
        ProvenanceEventRecord dropEvent = this.provenanceReporter.drop((FlowFile)suspectRecord.getCurrent(), nfe.getMessage() == null ? "Content Not Found" : nfe.getMessage());
        if (dropEvent != null) {
            this.context.getProvenanceRepository().registerEvent(dropEvent);
        }
        if (missingClaim == registeredClaim) {
            suspectRecord.markForAbort();
            this.rollback();
            throw new MissingFlowFileException("Unable to find content for FlowFile", (Throwable)nfe);
        }
        if (missingClaim == transientClaim) {
            this.rollback();
            throw new MissingFlowFileException("Unable to find content for FlowFile", (Throwable)nfe);
        }
    }

    private FlowFile validateRecordState(FlowFile flowFile) {
        return this.validateRecordState(flowFile, false);
    }

    private FlowFile validateRecordState(FlowFile flowFile, boolean allowRecursiveRead) {
        if (!allowRecursiveRead && this.readRecursionSet.containsKey(flowFile)) {
            throw new IllegalStateException(String.valueOf(flowFile) + " already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed");
        }
        if (this.writeRecursionSet.contains(flowFile)) {
            throw new IllegalStateException(String.valueOf(flowFile) + " already in use for an active callback or an OutputStream created by ProcessSession.write(FlowFile) has not been closed");
        }
        StandardRepositoryRecord record = this.getRecord(flowFile);
        if (record == null) {
            this.rollback();
            throw new FlowFileHandlingException(String.valueOf(flowFile) + " is not known in this session (" + this.toString() + ")");
        }
        if (record.getTransferRelationship() != null) {
            this.rollback();
            throw new FlowFileHandlingException(String.valueOf(flowFile) + " is already marked for transfer");
        }
        if (record.isMarkedForDelete()) {
            this.rollback();
            throw new FlowFileHandlingException(String.valueOf(flowFile) + " has already been marked for removal");
        }
        return record.getCurrent();
    }

    private List<FlowFile> validateRecordState(Collection<FlowFile> flowFiles) {
        ArrayList<FlowFile> current = new ArrayList<FlowFile>(flowFiles.size());
        for (FlowFile flowFile : flowFiles) {
            current.add(this.validateRecordState(flowFile));
        }
        return current;
    }

    boolean isFlowFileKnown(FlowFile flowFile) {
        return this.records.containsKey(flowFile.getId());
    }

    private FlowFile getMostRecent(FlowFile flowFile) {
        StandardRepositoryRecord existingRecord = this.getRecord(flowFile);
        return existingRecord == null ? flowFile : existingRecord.getCurrent();
    }

    private static Map<String, String> intersectAttributes(Collection<FlowFile> flowFileList) {
        HashMap<String, String> result = new HashMap<String, String>();
        if (flowFileList == null || flowFileList.isEmpty()) {
            return result;
        }
        if (flowFileList.size() == 1) {
            result.putAll(flowFileList.iterator().next().getAttributes());
        }
        Map firstMap = flowFileList.iterator().next().getAttributes();
        block0: for (Map.Entry mapEntry : firstMap.entrySet()) {
            String key = (String)mapEntry.getKey();
            String value = (String)mapEntry.getValue();
            for (FlowFile flowFile : flowFileList) {
                Map currMap = flowFile.getAttributes();
                String curVal = (String)currMap.get(key);
                if (curVal != null && curVal.equals(value)) continue;
                continue block0;
            }
            result.put(key, value);
        }
        return result;
    }

    public ProvenanceReporter getProvenanceReporter() {
        this.verifyTaskActive();
        return this.provenanceReporter;
    }

    public void setState(Map<String, String> state, Scope scope) throws IOException {
        Optional currentVersion = this.getState(scope).getStateVersion();
        String version = currentVersion.map(this::getIncrementedVersion).orElse(INITIAL_VERSION);
        StandardStateMap stateMap = new StandardStateMap(state, Optional.of(version));
        this.setState(stateMap, scope);
    }

    private void setState(StateMap stateMap, Scope scope) {
        if (scope == Scope.LOCAL) {
            this.localState = stateMap;
        } else {
            this.clusterState = stateMap;
        }
    }

    public StateMap getState(Scope scope) throws IOException {
        if (scope == Scope.LOCAL) {
            if (this.localState != null) {
                return this.localState;
            }
            if (this.checkpoint != null && this.checkpoint.localState != null) {
                return this.checkpoint.localState;
            }
            return this.context.getStateManager().getState(scope);
        }
        if (this.clusterState != null) {
            return this.clusterState;
        }
        if (this.checkpoint != null && this.checkpoint.clusterState != null) {
            return this.checkpoint.clusterState;
        }
        return this.context.getStateManager().getState(scope);
    }

    public boolean replaceState(StateMap oldValue, Map<String, String> newValue, Scope scope) throws IOException {
        StateMap current = this.getState(scope);
        if (!(current.getStateVersion().isPresent() || oldValue != null && oldValue.getStateVersion().isPresent())) {
            StandardStateMap stateMap = new StandardStateMap(newValue, Optional.of(INITIAL_VERSION));
            this.setState(stateMap, scope);
            return true;
        }
        if (oldValue == null) {
            return false;
        }
        if (current.getStateVersion().equals(oldValue.getStateVersion()) && current.toMap().equals(oldValue.toMap())) {
            String version = current.getStateVersion().map(this::getIncrementedVersion).orElse(INITIAL_VERSION);
            StandardStateMap stateMap = new StandardStateMap(newValue, Optional.of(version));
            this.setState(stateMap, scope);
            return true;
        }
        return false;
    }

    public void clearState(Scope scope) {
        this.setState(EMPTY_STATE_MAP, scope);
    }

    public String toString() {
        return "StandardProcessSession[id=" + this.sessionId + "]";
    }

    private String getIncrementedVersion(String currentVersion) {
        long versionNumber = Long.parseLong(currentVersion);
        long version = versionNumber + 1L;
        return String.valueOf(version);
    }

    protected static class Checkpoint {
        private long processingTime = 0L;
        private Map<FlowFile, List<ProvenanceEventRecord>> generatedProvenanceEvents;
        private Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders;
        private List<ProvenanceEventRecord> autoTerminatedEvents;
        private Set<ProvenanceEventRecord> reportedEvents;
        private Map<Long, StandardRepositoryRecord> records;
        private Map<String, StandardFlowFileEvent> connectionCounts;
        private Map<String, Long> countersOnCommit;
        private Map<String, Long> immediateCounters;
        private Map<FlowFile, Path> deleteOnCommit;
        private Set<String> removedFlowFiles;
        private Set<String> createdFlowFiles;
        private int removedCount = 0;
        private long removedBytes = 0L;
        private long bytesRead = 0L;
        private long bytesWritten = 0L;
        private int flowFilesIn = 0;
        private int flowFilesOut = 0;
        private long contentSizeIn = 0L;
        private long contentSizeOut = 0L;
        private int flowFilesReceived = 0;
        private int flowFilesSent = 0;
        private long bytesReceived = 0L;
        private long bytesSent = 0L;
        private boolean initialized = false;
        private StateMap localState;
        private StateMap clusterState;

        protected Checkpoint() {
        }

        private void initializeForCopy() {
            if (this.initialized) {
                return;
            }
            this.generatedProvenanceEvents = new HashMap<FlowFile, List<ProvenanceEventRecord>>();
            this.forkEventBuilders = new HashMap<FlowFile, ProvenanceEventBuilder>();
            this.autoTerminatedEvents = new ArrayList<ProvenanceEventRecord>();
            this.reportedEvents = new LinkedHashSet<ProvenanceEventRecord>();
            this.records = new ConcurrentHashMap<Long, StandardRepositoryRecord>();
            this.connectionCounts = new ConcurrentHashMap<String, StandardFlowFileEvent>();
            this.countersOnCommit = new HashMap<String, Long>();
            this.immediateCounters = new HashMap<String, Long>();
            this.deleteOnCommit = new HashMap<FlowFile, Path>();
            this.removedFlowFiles = new HashSet<String>();
            this.createdFlowFiles = new HashSet<String>();
            this.initialized = true;
        }

        private void checkpoint(StandardProcessSession session, List<ProvenanceEventRecord> autoTerminatedEvents, boolean copy) {
            if (copy) {
                this.copyCheckpoint(session, autoTerminatedEvents);
            } else {
                this.directCheckpoint(session, autoTerminatedEvents);
            }
        }

        private void directCheckpoint(StandardProcessSession session, List<ProvenanceEventRecord> autoTerminatedEvents) {
            this.processingTime = System.nanoTime() - session.processingStartTime;
            this.generatedProvenanceEvents = session.generatedProvenanceEvents;
            this.forkEventBuilders = session.forkEventBuilders;
            this.autoTerminatedEvents = autoTerminatedEvents;
            this.reportedEvents = session.provenanceReporter.getEvents();
            this.records = session.records;
            this.connectionCounts = session.connectionCounts;
            this.countersOnCommit = session.countersOnCommit == null ? Collections.emptyMap() : session.countersOnCommit;
            this.immediateCounters = session.immediateCounters == null ? Collections.emptyMap() : session.immediateCounters;
            this.deleteOnCommit = session.deleteOnCommit;
            this.removedFlowFiles = session.removedFlowFiles;
            this.createdFlowFiles = session.createdFlowFiles;
            this.removedCount = session.removedCount;
            this.removedBytes = session.removedBytes;
            this.bytesRead = session.bytesRead;
            this.bytesWritten = session.bytesWritten;
            this.flowFilesIn = session.flowFilesIn;
            this.flowFilesOut = session.flowFilesOut;
            this.contentSizeIn = session.contentSizeIn;
            this.contentSizeOut = session.contentSizeOut;
            this.flowFilesReceived = session.provenanceReporter.getFlowFilesReceived() + session.provenanceReporter.getFlowFilesFetched();
            this.bytesReceived = session.provenanceReporter.getBytesReceived() + session.provenanceReporter.getBytesFetched();
            this.flowFilesSent = session.provenanceReporter.getFlowFilesSent();
            this.bytesSent = session.provenanceReporter.getBytesSent();
            if (session.localState != null) {
                this.localState = session.localState;
            }
            if (session.clusterState != null) {
                this.clusterState = session.clusterState;
            }
        }

        private void copyCheckpoint(StandardProcessSession session, List<ProvenanceEventRecord> autoTerminatedEvents) {
            this.initializeForCopy();
            this.processingTime += System.nanoTime() - session.processingStartTime;
            this.generatedProvenanceEvents.putAll(session.generatedProvenanceEvents);
            this.forkEventBuilders.putAll(session.forkEventBuilders);
            if (autoTerminatedEvents != null) {
                this.autoTerminatedEvents.addAll(autoTerminatedEvents);
            }
            this.reportedEvents.addAll(session.provenanceReporter.getEvents());
            this.records.putAll(session.records);
            this.mergeMapsWithMutableValue(this.connectionCounts, session.connectionCounts, (destination, toMerge) -> destination.add((FlowFileEvent)toMerge));
            this.mergeMaps(this.countersOnCommit, session.countersOnCommit, Long::sum);
            this.mergeMaps(this.immediateCounters, session.immediateCounters, Long::sum);
            this.deleteOnCommit.putAll(session.deleteOnCommit);
            this.removedFlowFiles.addAll(session.removedFlowFiles);
            this.createdFlowFiles.addAll(session.createdFlowFiles);
            this.removedCount += session.removedCount;
            this.removedBytes += session.removedBytes;
            this.bytesRead += session.bytesRead;
            this.bytesWritten += session.bytesWritten;
            this.flowFilesIn += session.flowFilesIn;
            this.flowFilesOut += session.flowFilesOut;
            this.contentSizeIn += session.contentSizeIn;
            this.contentSizeOut += session.contentSizeOut;
            this.flowFilesReceived += session.provenanceReporter.getFlowFilesReceived() + session.provenanceReporter.getFlowFilesFetched();
            this.bytesReceived += session.provenanceReporter.getBytesReceived() + session.provenanceReporter.getBytesFetched();
            this.flowFilesSent += session.provenanceReporter.getFlowFilesSent();
            this.bytesSent += session.provenanceReporter.getBytesSent();
            if (session.localState != null) {
                this.localState = session.localState;
            }
            if (session.clusterState != null) {
                this.clusterState = session.clusterState;
            }
        }

        private <K, V> void mergeMaps(Map<K, V> destination, Map<K, V> toMerge, BiFunction<? super V, ? super V, ? extends V> merger) {
            if (toMerge == null) {
                return;
            }
            if (destination.isEmpty()) {
                destination.putAll(toMerge);
            } else {
                toMerge.forEach((key, value) -> destination.merge(key, value, merger));
            }
        }

        private <K, V> void mergeMapsWithMutableValue(Map<K, V> destination, Map<K, V> toMerge, BiConsumer<? super V, ? super V> merger) {
            if (toMerge == null) {
                return;
            }
            if (destination.isEmpty()) {
                destination.putAll(toMerge);
                return;
            }
            for (Map.Entry<K, V> entry : toMerge.entrySet()) {
                K key = entry.getKey();
                V value = entry.getValue();
                V destinationValue = destination.get(key);
                if (destinationValue == null) {
                    destination.put(key, value);
                    continue;
                }
                merger.accept(destinationValue, value);
            }
        }

        private StandardRepositoryRecord getRecord(FlowFile flowFile) {
            return this.records.get(flowFile.getId());
        }

        public int getFlowFilesIn() {
            return this.flowFilesIn;
        }

        public int getFlowFilesOut() {
            return this.flowFilesOut;
        }

        public int getFlowFilesRemoved() {
            return this.removedCount;
        }

        public long getBytesIn() {
            return this.contentSizeIn;
        }

        public long getBytesOut() {
            return this.contentSizeOut;
        }

        public long getBytesRemoved() {
            return this.removedBytes;
        }
    }

    private static class FlowFileLinkage {
        private final Map<Long, List<Long>> linkedIds = new HashMap<Long, List<Long>>();

        private FlowFileLinkage() {
        }

        public void addLink(long id, long other) {
            if (id == other) {
                return;
            }
            this.linkedIds.computeIfAbsent(id, key -> new ArrayList()).add(other);
            this.linkedIds.computeIfAbsent(other, key -> new ArrayList()).add(id);
        }

        public Collection<Long> getLinkedIds(long id) {
            List<Long> linked = this.linkedIds.get(id);
            HashSet<Long> allLinked = new HashSet<Long>();
            if (linked != null) {
                allLinked.addAll(linked);
                for (Long linkedId : linked) {
                    List<Long> onceRemoved = this.linkedIds.get(linkedId);
                    if (onceRemoved == null) continue;
                    allLinked.addAll(onceRemoved);
                }
            }
            return allLinked;
        }

        public void clear() {
            this.linkedIds.clear();
        }
    }

    private static interface ConnectionPoller {
        public List<FlowFileRecord> poll(Connection var1, Set<FlowFileRecord> var2);
    }
}

