/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.gcp.storage;

import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Acl;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.list.ListableEntityWrapper;
import org.apache.nifi.processor.util.list.ListedEntity;
import org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.processors.gcp.storage.AbstractGCSProcessor;
import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
import org.apache.nifi.processors.gcp.storage.FetchGCSObject;
import org.apache.nifi.processors.gcp.storage.PutGCSObject;
import org.apache.nifi.processors.gcp.storage.StorageAttributes;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;

@PrimaryNodeOnly
@TriggerSerially
@TriggerWhenEmpty
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"google cloud", "google", "storage", "gcs", "list"})
@CapabilityDescription(value="Retrieves a listing of objects from a GCS bucket. For each object that is listed, creates a FlowFile that represents the object so that it can be fetched in conjunction with FetchGCSObject. This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating all of the data.")
@Stateful(scopes={Scope.CLUSTER}, description="After performing a listing of keys, the timestamp of the newest key is stored, along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
@SeeAlso(value={PutGCSObject.class, DeleteGCSObject.class, FetchGCSObject.class})
@WritesAttributes(value={@WritesAttribute(attribute="filename", description="The name of the file"), @WritesAttribute(attribute="gcs.bucket", description="Bucket of the object."), @WritesAttribute(attribute="gcs.key", description="Name of the object."), @WritesAttribute(attribute="gcs.size", description="Size of the object."), @WritesAttribute(attribute="gcs.cache.control", description="Data cache control of the object."), @WritesAttribute(attribute="gcs.component.count", description="The number of components which make up the object."), @WritesAttribute(attribute="gcs.content.disposition", description="The data content disposition of the object."), @WritesAttribute(attribute="gcs.content.encoding", description="The content encoding of the object."), @WritesAttribute(attribute="gcs.content.language", description="The content language of the object."), @WritesAttribute(attribute="mime.type", description="The MIME/Content-Type of the object"), @WritesAttribute(attribute="gcs.crc32c", description="The CRC32C checksum of object's data, encoded in base64 in big-endian order."), @WritesAttribute(attribute="gcs.create.time", description="The creation time of the object (milliseconds)"), @WritesAttribute(attribute="gcs.update.time", description="The last modification time of the object (milliseconds)"), @WritesAttribute(attribute="gcs.encryption.algorithm", description="The algorithm used to encrypt the object."), @WritesAttribute(attribute="gcs.encryption.sha256", description="The SHA256 hash of the key used to encrypt the object"), @WritesAttribute(attribute="gcs.etag", description="The HTTP 1.1 Entity tag for the object."), @WritesAttribute(attribute="gcs.generated.id", description="The service-generated for the object"), @WritesAttribute(attribute="gcs.generation", description="The data generation of the object."), @WritesAttribute(attribute="gcs.md5", description="The MD5 hash of the object's data encoded in base64."), @WritesAttribute(attribute="gcs.media.link", description="The media download link to the object."), @WritesAttribute(attribute="gcs.metageneration", description="The metageneration of the object."), @WritesAttribute(attribute="gcs.owner", description="The owner (uploader) of the object."), @WritesAttribute(attribute="gcs.owner.type", description="The ACL entity type of the uploader of the object."), @WritesAttribute(attribute="gcs.acl.owner", description="A comma-delimited list of ACL entities that have owner access to the object. Entities will be either email addresses, domains, or project IDs."), @WritesAttribute(attribute="gcs.acl.writer", description="A comma-delimited list of ACL entities that have write access to the object. Entities will be either email addresses, domains, or project IDs."), @WritesAttribute(attribute="gcs.acl.reader", description="A comma-delimited list of ACL entities that have read access to the object. Entities will be either email addresses, domains, or project IDs."), @WritesAttribute(attribute="gcs.uri", description="The URI of the object as a string.")})
@DefaultSchedule(strategy=SchedulingStrategy.TIMER_DRIVEN, period="1 min")
public class ListGCSBucket
extends AbstractGCSProcessor {
    public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps", "This strategy tracks the latest timestamp of listed entity to determine new/updated entities. Since it only tracks few timestamps, it can manage listing state efficiently. This strategy will not pick up any newly added or modified entity if their timestamps are older than the tracked latest timestamp. Also may miss files when multiple subdirectories are being written at the same time while listing is running.");
    public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities", "This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities. This strategy can pick entities having old timestamp that can be missed with 'Tracing Timestamps'. Works even when multiple subdirectories are being written at the same time while listing is running. However an additional DistributedMapCache controller service is required and more JVM heap memory is used. For more information on how the 'Entity Tracking Time Window' property works, see the description.");
    public static final AllowableValue NO_TRACKING = new AllowableValue("none", "No Tracking", "This strategy lists all entities without any tracking. The same entities will be listed each time this processor is scheduled. It is recommended to change the default run schedule value. Any property that relates to the persisting state will be ignored.");
    public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder().name("listing-strategy").displayName("Listing Strategy").description("Specify how to determine new/updated entities. See each strategy descriptions for detail.").required(true).allowableValues(new DescribedValue[]{BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING}).defaultValue(BY_TIMESTAMPS.getValue()).build();
    public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder().fromPropertyDescriptor(ListedEntityTracker.TRACKING_STATE_CACHE).dependsOn(LISTING_STRATEGY, new AllowableValue[]{BY_ENTITIES}).required(true).build();
    public static final PropertyDescriptor INITIAL_LISTING_TARGET = new PropertyDescriptor.Builder().fromPropertyDescriptor(ListedEntityTracker.INITIAL_LISTING_TARGET).dependsOn(LISTING_STRATEGY, new AllowableValue[]{BY_ENTITIES}).build();
    public static final PropertyDescriptor TRACKING_TIME_WINDOW = new PropertyDescriptor.Builder().fromPropertyDescriptor(ListedEntityTracker.TRACKING_TIME_WINDOW).required(true).build();
    public static final PropertyDescriptor BUCKET = new PropertyDescriptor.Builder().name("gcs-bucket").displayName("Bucket").description("Bucket of the object.").required(true).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("gcs-prefix").displayName("Prefix").description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor USE_GENERATIONS = new PropertyDescriptor.Builder().name("gcs-use-generations").displayName("Use Generations").expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).allowableValues(new String[]{"true", "false"}).defaultValue("false").description("Specifies whether to use GCS Generations, if applicable.  If false, only the latest version of each object will be returned.").build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, all entities will be written to a single FlowFile instead of adding attributes to individual FlowFiles.").required(false).identifiesControllerService(RecordSetWriterFactory.class).build();
    private static final List<PropertyDescriptor> DESCRIPTORS = List.of(GCP_CREDENTIALS_PROVIDER_SERVICE, PROJECT_ID, BUCKET, PREFIX, LISTING_STRATEGY, TRACKING_STATE_CACHE, INITIAL_LISTING_TARGET, TRACKING_TIME_WINDOW, RECORD_WRITER, USE_GENERATIONS, RETRY_COUNT, STORAGE_API_URL, PROXY_CONFIGURATION_SERVICE);
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
    private static final Set<PropertyDescriptor> TRACKING_RESET_PROPERTIES = Set.of(BUCKET, PREFIX, LISTING_STRATEGY);
    public static final String CURRENT_TIMESTAMP = "currentTimestamp";
    public static final String CURRENT_KEY_PREFIX = "key-";
    private volatile long currentTimestamp = 0L;
    private final Set<String> currentKeys = Collections.synchronizedSet(new HashSet());
    private volatile ListedEntityTracker<ListableBlob> listedEntityTracker;
    private volatile boolean justElectedPrimaryNode = false;
    private volatile boolean resetTracking = false;

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    @Override
    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @OnPrimaryNodeStateChange
    public void onPrimaryNodeChange(PrimaryNodeState newState) {
        this.justElectedPrimaryNode = newState == PrimaryNodeState.ELECTED_PRIMARY_NODE;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (this.isConfigurationRestored() && TRACKING_RESET_PROPERTIES.contains(descriptor)) {
            this.resetTracking = true;
        }
    }

    @OnScheduled
    public void initTrackingStrategy(ProcessContext context) throws IOException {
        String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
        boolean isTrackingTimestampsStrategy = BY_TIMESTAMPS.getValue().equals(listingStrategy);
        boolean isTrackingEntityStrategy = BY_ENTITIES.getValue().equals(listingStrategy);
        if (this.resetTracking || !isTrackingTimestampsStrategy) {
            context.getStateManager().clear(Scope.CLUSTER);
            this.currentTimestamp = 0L;
            this.currentKeys.clear();
        }
        if (this.listedEntityTracker != null && (this.resetTracking || !isTrackingEntityStrategy)) {
            try {
                this.listedEntityTracker.clearListedEntities();
                this.listedEntityTracker = null;
            }
            catch (IOException e) {
                throw new RuntimeException("Failed to reset previously listed entities", e);
            }
        }
        if (isTrackingEntityStrategy && this.listedEntityTracker == null) {
            this.listedEntityTracker = this.createListedEntityTracker();
        }
        this.resetTracking = false;
    }

    protected ListedEntityTracker<ListableBlob> createListedEntityTracker() {
        return new ListedBlobTracker();
    }

    private Set<String> extractKeys(StateMap stateMap) {
        return stateMap.toMap().entrySet().parallelStream().filter(x -> ((String)x.getKey()).startsWith(CURRENT_KEY_PREFIX)).map(Map.Entry::getValue).collect(Collectors.toSet());
    }

    void restoreState(ProcessSession session) throws IOException {
        StateMap stateMap = session.getState(Scope.CLUSTER);
        if (!stateMap.getStateVersion().isPresent() || stateMap.get(CURRENT_TIMESTAMP) == null || stateMap.get("key-0") == null) {
            this.currentTimestamp = 0L;
            this.currentKeys.clear();
        } else {
            this.currentTimestamp = Long.parseLong(stateMap.get(CURRENT_TIMESTAMP));
            this.currentKeys.clear();
            this.currentKeys.addAll(this.extractKeys(stateMap));
        }
    }

    void persistState(ProcessSession session, long timestamp, Set<String> keys) {
        HashMap<Object, String> state = new HashMap<Object, String>();
        state.put(CURRENT_TIMESTAMP, String.valueOf(timestamp));
        int i = 0;
        for (String key : keys) {
            state.put(CURRENT_KEY_PREFIX + i, key);
            ++i;
        }
        try {
            session.setState(state, Scope.CLUSTER);
        }
        catch (IOException ioe) {
            this.getLogger().error("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", (Throwable)ioe);
        }
    }

    Set<String> getStateKeys() {
        return Collections.unmodifiableSet(this.currentKeys);
    }

    long getStateTimestamp() {
        return this.currentTimestamp;
    }

    @Override
    protected List<String> getRequiredPermissions() {
        return Collections.singletonList("storage.objects.list");
    }

    @Override
    protected String getBucketName(ProcessContext context, Map<String, String> attributes) {
        return context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
    }

    @Override
    public List<ConfigVerificationResult> verify(ProcessContext context, ComponentLog verificationLogger, Map<String, String> attributes) {
        ArrayList<ConfigVerificationResult> results = new ArrayList<ConfigVerificationResult>(super.verify(context, verificationLogger, attributes));
        String bucketName = this.getBucketName(context, attributes);
        try {
            VerifyListingAction listingAction = new VerifyListingAction(context);
            this.listBucket(context, listingAction);
            int blobCount = listingAction.getBlobWriter().getCount();
            results.add(new ConfigVerificationResult.Builder().verificationStepName("List GCS Bucket").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(String.format("Successfully listed Bucket [%s], finding %s blobs matching the filter", bucketName, blobCount)).build());
        }
        catch (Exception e) {
            verificationLogger.error("Failed to list GCS Bucket", (Throwable)e);
            results.add(new ConfigVerificationResult.Builder().verificationStepName("List GCS Bucket").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("Failed to list Bucket [%s]: %s", bucketName, e.getMessage())).build());
        }
        return results;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
        if (BY_TIMESTAMPS.equals((Object)listingStrategy)) {
            this.listByTrackingTimestamps(context, session);
        } else if (BY_ENTITIES.equals((Object)listingStrategy)) {
            this.listByTrackingEntities(context, session);
        } else if (NO_TRACKING.equals((Object)listingStrategy)) {
            this.listNoTracking(context, session);
        } else {
            throw new ProcessException("Unknown listing strategy: " + listingStrategy);
        }
    }

    private void listNoTracking(ProcessContext context, ProcessSession session) {
        long startNanos = System.nanoTime();
        NoTrackingListingAction listingAction = new NoTrackingListingAction(context, session);
        try {
            this.listBucket(context, listingAction);
        }
        catch (Exception e) {
            this.getLogger().error("Failed to list contents of GCS Bucket", (Throwable)e);
            listingAction.getBlobWriter().finishListingExceptionally(e);
            session.rollback();
            context.yield();
            return;
        }
        long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
        this.getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(), listMillis});
    }

    private void listByTrackingTimestamps(ProcessContext context, ProcessSession session) {
        try {
            this.restoreState(session);
        }
        catch (IOException e) {
            this.getLogger().error("Failed to restore processor state; yielding", (Throwable)e);
            context.yield();
            return;
        }
        long startNanos = System.nanoTime();
        TriggerListingAction listingAction = new TriggerListingAction(context, session);
        try {
            this.listBucket(context, listingAction);
        }
        catch (Exception e) {
            this.getLogger().error("Failed to list contents of GCS Bucket", (Throwable)e);
            listingAction.getBlobWriter().finishListingExceptionally(e);
            session.rollback();
            context.yield();
            return;
        }
        long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
        this.getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(), listMillis});
    }

    private void listBucket(ProcessContext context, ListingAction listingAction) throws IOException, SchemaNotFoundException {
        String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
        List<Storage.BlobListOption> listOptions = this.getBlobListOptions(context);
        Storage storage = listingAction.getCloudService();
        long maxTimestamp = 0L;
        HashSet<String> keysMatchingTimestamp = new HashSet<String>();
        Object writer = listingAction.getBlobWriter();
        writer.beginListing();
        Page blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[0]));
        int listCount = 0;
        do {
            for (Blob blob : blobPage.getValues()) {
                long lastModified = blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli();
                if (listingAction.skipBlob(blob)) continue;
                writer.addToListing(blob);
                if (lastModified > maxTimestamp) {
                    maxTimestamp = lastModified;
                    keysMatchingTimestamp.clear();
                }
                if (lastModified == maxTimestamp) {
                    keysMatchingTimestamp.add(blob.getName());
                }
                ++listCount;
            }
            if (!writer.isCheckpoint()) continue;
            listingAction.commit(listCount);
            listCount = 0;
        } while ((blobPage = blobPage.getNextPage()) != null);
        writer.finishListing();
        listingAction.finishListing(listCount, maxTimestamp, keysMatchingTimestamp);
    }

    private List<Storage.BlobListOption> getBlobListOptions(ProcessContext context) {
        String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
        boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean();
        ArrayList<Storage.BlobListOption> listOptions = new ArrayList<Storage.BlobListOption>();
        if (prefix != null) {
            listOptions.add(Storage.BlobListOption.prefix((String)prefix));
        }
        if (useGenerations) {
            listOptions.add(Storage.BlobListOption.versions((boolean)true));
        }
        return listOptions;
    }

    private void listByTrackingEntities(ProcessContext context, ProcessSession session) {
        this.listedEntityTracker.trackEntities(context, session, this.justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
            ArrayList<ListableBlob> listedEntities = new ArrayList<ListableBlob>();
            Storage storage = (Storage)this.getCloudService();
            String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
            List<Storage.BlobListOption> listOptions = this.getBlobListOptions(context);
            Page blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[0]));
            int pageNr = 0;
            do {
                for (Blob blob : blobPage.getValues()) {
                    if (blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli() < minTimestampToList) continue;
                    listedEntities.add(new ListableBlob(blob, pageNr));
                }
                blobPage = blobPage.getNextPage();
                ++pageNr;
            } while (blobPage != null);
            return listedEntities;
        }, null);
        this.justElectedPrimaryNode = false;
    }

    private void commit(ProcessSession session, int listCount) {
        if (listCount > 0) {
            this.getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[]{listCount});
            session.commitAsync();
        }
    }

    long getCurrentTimestamp() {
        return this.currentTimestamp;
    }

    ListedEntityTracker<ListableBlob> getListedEntityTracker() {
        return this.listedEntityTracker;
    }

    boolean isResetTracking() {
        return this.resetTracking;
    }

    protected class ListedBlobTracker
    extends ListedEntityTracker<ListableBlob> {
        public ListedBlobTracker() {
            super(ListGCSBucket.this.getIdentifier(), ListGCSBucket.this.getLogger(), RecordBlobWriter.RECORD_SCHEMA);
        }

        protected void createRecordsForEntities(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities) throws IOException, SchemaNotFoundException {
            this.publishListing(context, session, updatedEntities);
        }

        protected void createFlowFilesForEntities(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities, Function<ListableBlob, Map<String, String>> createAttributes) {
            this.publishListing(context, session, updatedEntities);
        }

        private void publishListing(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities) {
            RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
            BlobWriter writer = writerFactory == null ? new AttributeBlobWriter(session) : new RecordBlobWriter(session, writerFactory, ListGCSBucket.this.getLogger());
            try {
                writer.beginListing();
                int listCount = 0;
                int pageNr = -1;
                for (ListableBlob listableBlob : updatedEntities) {
                    Blob blob = (Blob)listableBlob.getRawEntity();
                    int currentPageNr = listableBlob.getPageNumber();
                    writer.addToListing(blob);
                    ++listCount;
                    if (pageNr != -1 && pageNr != currentPageNr && writer.isCheckpoint()) {
                        ListGCSBucket.this.commit(session, listCount);
                        listCount = 0;
                    }
                    pageNr = currentPageNr;
                    ListedEntity listedEntity = new ListedEntity(listableBlob.getTimestamp(), listableBlob.getSize());
                    this.alreadyListedEntities.put(listableBlob.getIdentifier(), listedEntity);
                }
                writer.finishListing();
            }
            catch (Exception e) {
                ListGCSBucket.this.getLogger().error("Failed to list contents of bucket", (Throwable)e);
                writer.finishListingExceptionally(e);
                session.rollback();
                context.yield();
                return;
            }
        }
    }

    private class VerifyListingAction
    implements ListingAction<CountingBlobWriter> {
        final ProcessContext context;
        final CountingBlobWriter blobWriter;

        private VerifyListingAction(ProcessContext context) {
            this.context = context;
            this.blobWriter = new CountingBlobWriter();
        }

        @Override
        public boolean skipBlob(Blob blob) {
            return false;
        }

        @Override
        public void commit(int listCount) {
        }

        @Override
        public CountingBlobWriter getBlobWriter() {
            return this.blobWriter;
        }

        @Override
        public Storage getCloudService() {
            return (Storage)ListGCSBucket.this.getCloudService(this.context);
        }

        @Override
        public void finishListing(int listCount, long maxTimestamp, Set<String> keysMatchingTimestamp) {
        }
    }

    private static interface ListingAction<T extends BlobWriter> {
        public boolean skipBlob(Blob var1);

        public T getBlobWriter();

        public Storage getCloudService();

        public void finishListing(int var1, long var2, Set<String> var4);

        public void commit(int var1);
    }

    private static class CountingBlobWriter
    implements BlobWriter {
        private int count = 0;

        private CountingBlobWriter() {
        }

        @Override
        public void beginListing() {
        }

        @Override
        public void addToListing(Blob blob) {
            ++this.count;
        }

        @Override
        public void finishListing() {
        }

        @Override
        public void finishListingExceptionally(Exception cause) {
        }

        @Override
        public boolean isCheckpoint() {
            return false;
        }

        public int getCount() {
            return this.count;
        }
    }

    private class NoTrackingListingAction
    implements ListingAction<BlobWriter> {
        final ProcessContext context;
        final ProcessSession session;
        final BlobWriter blobWriter;

        private NoTrackingListingAction(ProcessContext context, ProcessSession session) {
            this.context = context;
            this.session = session;
            RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
            this.blobWriter = writerFactory == null ? new AttributeBlobWriter(session) : new RecordBlobWriter(session, writerFactory, ListGCSBucket.this.getLogger());
        }

        @Override
        public boolean skipBlob(Blob blob) {
            return false;
        }

        @Override
        public void commit(int listCount) {
            ListGCSBucket.this.commit(this.session, listCount);
        }

        @Override
        public BlobWriter getBlobWriter() {
            return this.blobWriter;
        }

        @Override
        public Storage getCloudService() {
            return (Storage)ListGCSBucket.this.getCloudService();
        }

        @Override
        public void finishListing(int listCount, long maxTimestamp, Set<String> keysMatchingTimestamp) {
        }
    }

    private static interface BlobWriter {
        public void beginListing() throws IOException, SchemaNotFoundException;

        public void addToListing(Blob var1) throws IOException;

        public void finishListing() throws IOException;

        public void finishListingExceptionally(Exception var1);

        public boolean isCheckpoint();
    }

    private class TriggerListingAction
    implements ListingAction<BlobWriter> {
        final ProcessContext context;
        final ProcessSession session;
        final BlobWriter blobWriter;

        private TriggerListingAction(ProcessContext context, ProcessSession session) {
            this.context = context;
            this.session = session;
            RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
            this.blobWriter = writerFactory == null ? new AttributeBlobWriter(session) : new RecordBlobWriter(session, writerFactory, ListGCSBucket.this.getLogger());
        }

        @Override
        public boolean skipBlob(Blob blob) {
            long lastModified = blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli();
            return lastModified < ListGCSBucket.this.currentTimestamp || lastModified == ListGCSBucket.this.currentTimestamp && ListGCSBucket.this.currentKeys.contains(blob.getName());
        }

        @Override
        public void commit(int listCount) {
            ListGCSBucket.this.commit(this.session, listCount);
        }

        @Override
        public BlobWriter getBlobWriter() {
            return this.blobWriter;
        }

        @Override
        public Storage getCloudService() {
            return (Storage)ListGCSBucket.this.getCloudService();
        }

        @Override
        public void finishListing(int listCount, long maxTimestamp, Set<String> keysMatchingTimestamp) {
            if (maxTimestamp == 0L) {
                ListGCSBucket.this.getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", new Object[]{this.context.getProperty(BUCKET).evaluateAttributeExpressions().getValue()});
                this.context.yield();
            } else {
                this.commit(listCount);
                ListGCSBucket.this.currentTimestamp = maxTimestamp;
                ListGCSBucket.this.currentKeys.clear();
                ListGCSBucket.this.currentKeys.addAll(keysMatchingTimestamp);
                ListGCSBucket.this.persistState(this.session, ListGCSBucket.this.currentTimestamp, ListGCSBucket.this.currentKeys);
            }
        }
    }

    private static class ListableBlob
    extends ListableEntityWrapper<Blob> {
        private final int pageNr;

        public ListableBlob(Blob blob, int pageNr) {
            super((Object)blob, BlobInfo::getName, BlobInfo::getGeneratedId, gcsBlob -> gcsBlob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli(), BlobInfo::getSize);
            this.pageNr = pageNr;
        }

        public int getPageNumber() {
            return this.pageNr;
        }
    }

    private static class AttributeBlobWriter
    implements BlobWriter {
        private final ProcessSession session;

        public AttributeBlobWriter(ProcessSession session) {
            this.session = session;
        }

        @Override
        public void beginListing() {
        }

        @Override
        public void addToListing(Blob blob) {
            Map<String, String> attributes = StorageAttributes.createAttributes(blob);
            FlowFile flowFile = this.session.create();
            flowFile = this.session.putAllAttributes(flowFile, attributes);
            this.session.transfer(flowFile, AbstractGCSProcessor.REL_SUCCESS);
        }

        @Override
        public void finishListing() {
        }

        @Override
        public void finishListingExceptionally(Exception cause) {
        }

        @Override
        public boolean isCheckpoint() {
            return true;
        }
    }

    static class RecordBlobWriter
    implements BlobWriter {
        private static final RecordSchema RECORD_SCHEMA;
        public static final String BUCKET = "bucket";
        public static final String NAME = "name";
        public static final String SIZE = "size";
        public static final String CACHE_CONTROL = "cacheControl";
        public static final String COMPONENT_COUNT = "componentCount";
        public static final String CONTENT_DISPOSITION = "contentDisposition";
        public static final String CONTENT_ENCODING = "contentEncoding";
        public static final String CONTENT_LANGUAGE = "contentLanguage";
        public static final String CRC32C = "crc32c";
        public static final String CREATE_TIME = "createTime";
        public static final String UPDATE_TIME = "updateTime";
        public static final String ENCRYPTION_ALGORITHM = "encryptionAlgorithm";
        public static final String ENCRYPTION_KEY_SHA256 = "encryptionKeySha256";
        public static final String ETAG = "etag";
        public static final String GENERATED_ID = "generatedId";
        public static final String GENERATION = "generation";
        public static final String MD5 = "md5";
        public static final String MEDIA_LINK = "mediaLink";
        public static final String METAGENERATION = "metageneration";
        public static final String OWNER = "owner";
        public static final String OWNER_TYPE = "ownerType";
        public static final String URI = "uri";
        private final ProcessSession session;
        private final RecordSetWriterFactory writerFactory;
        private final ComponentLog logger;
        private RecordSetWriter recordWriter;
        private FlowFile flowFile;

        public RecordBlobWriter(ProcessSession session, RecordSetWriterFactory writerFactory, ComponentLog logger) {
            this.session = session;
            this.writerFactory = writerFactory;
            this.logger = logger;
        }

        @Override
        public void beginListing() throws IOException, SchemaNotFoundException {
            this.flowFile = this.session.create();
            OutputStream out = this.session.write(this.flowFile);
            this.recordWriter = this.writerFactory.createWriter(this.logger, RECORD_SCHEMA, out, this.flowFile);
            this.recordWriter.beginRecordSet();
        }

        @Override
        public void addToListing(Blob blob) throws IOException {
            this.recordWriter.write(this.createRecordForListing(blob));
        }

        @Override
        public void finishListing() throws IOException {
            WriteResult writeResult = this.recordWriter.finishRecordSet();
            this.recordWriter.close();
            if (writeResult.getRecordCount() == 0) {
                this.session.remove(this.flowFile);
            } else {
                HashMap<String, String> attributes = new HashMap<String, String>(writeResult.getAttributes());
                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
                this.flowFile = this.session.putAllAttributes(this.flowFile, attributes);
                this.session.transfer(this.flowFile, AbstractGCSProcessor.REL_SUCCESS);
            }
        }

        @Override
        public void finishListingExceptionally(Exception cause) {
            try {
                this.recordWriter.close();
            }
            catch (IOException e) {
                this.logger.error("Failed to write listing as Records", (Throwable)e);
            }
            this.session.remove(this.flowFile);
        }

        @Override
        public boolean isCheckpoint() {
            return false;
        }

        private Record createRecordForListing(Blob blob) {
            HashMap<String, Object> values = new HashMap<String, Object>();
            values.put(BUCKET, blob.getBucket());
            values.put(NAME, blob.getName());
            values.put(SIZE, blob.getSize());
            values.put(CACHE_CONTROL, blob.getCacheControl());
            values.put(COMPONENT_COUNT, blob.getComponentCount());
            values.put(CONTENT_DISPOSITION, blob.getContentDisposition());
            values.put(CONTENT_ENCODING, blob.getContentEncoding());
            values.put(CONTENT_LANGUAGE, blob.getContentLanguage());
            values.put(CRC32C, blob.getCrc32c());
            values.put(CREATE_TIME, blob.getCreateTimeOffsetDateTime() == null ? null : new Timestamp(blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli()));
            values.put(UPDATE_TIME, blob.getUpdateTimeOffsetDateTime() == null ? null : new Timestamp(blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli()));
            BlobInfo.CustomerEncryption encryption = blob.getCustomerEncryption();
            if (encryption != null) {
                values.put(ENCRYPTION_ALGORITHM, encryption.getEncryptionAlgorithm());
                values.put(ENCRYPTION_KEY_SHA256, encryption.getKeySha256());
            }
            values.put(ETAG, blob.getEtag());
            values.put(GENERATED_ID, blob.getGeneratedId());
            values.put(GENERATION, blob.getGeneration());
            values.put(MD5, blob.getMd5());
            values.put(MEDIA_LINK, blob.getMediaLink());
            values.put(METAGENERATION, blob.getMetageneration());
            Acl.Entity owner = blob.getOwner();
            if (owner != null) {
                if (owner instanceof Acl.User) {
                    values.put(OWNER, ((Acl.User)owner).getEmail());
                    values.put(OWNER_TYPE, "user");
                } else if (owner instanceof Acl.Group) {
                    values.put(OWNER, ((Acl.Group)owner).getEmail());
                    values.put(OWNER_TYPE, "group");
                } else if (owner instanceof Acl.Domain) {
                    values.put(OWNER, ((Acl.Domain)owner).getDomain());
                    values.put(OWNER_TYPE, "domain");
                } else if (owner instanceof Acl.Project) {
                    values.put(OWNER, ((Acl.Project)owner).getProjectId());
                    values.put(OWNER_TYPE, "project");
                }
            }
            values.put(URI, blob.getSelfLink());
            return new MapRecord(RECORD_SCHEMA, values);
        }

        static {
            ArrayList<RecordField> fields = new ArrayList<RecordField>();
            fields.add(new RecordField(BUCKET, RecordFieldType.STRING.getDataType(), false));
            fields.add(new RecordField(NAME, RecordFieldType.STRING.getDataType(), false));
            fields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType()));
            fields.add(new RecordField(CACHE_CONTROL, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(COMPONENT_COUNT, RecordFieldType.INT.getDataType()));
            fields.add(new RecordField(CONTENT_DISPOSITION, RecordFieldType.LONG.getDataType()));
            fields.add(new RecordField(CONTENT_ENCODING, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(CONTENT_LANGUAGE, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(CRC32C, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(CREATE_TIME, RecordFieldType.TIMESTAMP.getDataType()));
            fields.add(new RecordField(UPDATE_TIME, RecordFieldType.TIMESTAMP.getDataType()));
            fields.add(new RecordField(ENCRYPTION_ALGORITHM, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(ENCRYPTION_KEY_SHA256, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(ETAG, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(GENERATED_ID, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(GENERATION, RecordFieldType.LONG.getDataType()));
            fields.add(new RecordField(MD5, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(MEDIA_LINK, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(METAGENERATION, RecordFieldType.LONG.getDataType()));
            fields.add(new RecordField(OWNER, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(OWNER_TYPE, RecordFieldType.STRING.getDataType()));
            fields.add(new RecordField(URI, RecordFieldType.STRING.getDataType()));
            RECORD_SCHEMA = new SimpleRecordSchema(fields);
        }
    }
}

