/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.reporting;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonValue;
import org.apache.avro.Schema;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.reporting.AbstractSiteToSiteReportingTask;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.s2s.SiteToSiteUtils;
import org.apache.nifi.scheduling.SchedulingStrategy;

@Tags(value={"bulletin", "site", "site to site"})
@CapabilityDescription(value="Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to 10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins may not be sent.")
@Restricted(restrictions={@Restriction(requiredPermission=RequiredPermission.EXPORT_NIFI_DETAILS, explanation="Provides operator the ability to send sensitive details contained in bulletin events to any external system.")})
@DefaultSchedule(strategy=SchedulingStrategy.TIMER_DRIVEN, period="1 min")
public class SiteToSiteBulletinReportingTask
extends AbstractSiteToSiteReportingTask {
    private volatile long lastSentBulletinId = -1L;

    public SiteToSiteBulletinReportingTask() throws IOException {
        InputStream schema = ((Object)((Object)this)).getClass().getClassLoader().getResourceAsStream("schema-bulletins.avsc");
        this.recordSchema = AvroTypeUtil.createSchema((Schema)new Schema.Parser().parse(schema));
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
        properties.add(SiteToSiteUtils.PLATFORM);
        properties.remove(SiteToSiteUtils.BATCH_SIZE);
        return properties;
    }

    public void onTrigger(ReportingContext context) {
        long currMaxId;
        boolean isClustered = context.isClustered();
        String nodeId = context.getClusterNodeIdentifier();
        if (nodeId == null && isClustered) {
            this.getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. Will wait for Node Identifier to be established.");
            return;
        }
        BulletinQuery bulletinQuery = new BulletinQuery.Builder().after(Long.valueOf(this.lastSentBulletinId)).build();
        List bulletins = context.getBulletinRepository().findBulletins(bulletinQuery);
        if (bulletins == null || bulletins.isEmpty()) {
            this.getLogger().debug("No events to send because no events are stored in the repository.");
            return;
        }
        OptionalLong opMaxId = bulletins.stream().mapToLong(Bulletin::getId).max();
        long l = currMaxId = opMaxId.isPresent() ? opMaxId.getAsLong() : -1L;
        if (currMaxId < this.lastSentBulletinId) {
            this.getLogger().warn("Current bulletin max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the bulletins repository restarted its ids. Restarting querying from the beginning.", new Object[]{currMaxId, this.lastSentBulletinId});
            this.lastSentBulletinId = -1L;
        }
        if (currMaxId == this.lastSentBulletinId) {
            this.getLogger().debug("No events to send due to the current max id being equal to the last id that was sent.");
            return;
        }
        String platform = context.getProperty(SiteToSiteUtils.PLATFORM).evaluateAttributeExpressions().getValue();
        Boolean allowNullValues = context.getProperty(ALLOW_NULL_VALUES).asBoolean();
        Map config = Collections.emptyMap();
        JsonBuilderFactory factory = Json.createBuilderFactory(config);
        JsonObjectBuilder builder = factory.createObjectBuilder();
        long start = System.nanoTime();
        JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
        for (Bulletin bulletin : bulletins) {
            if (bulletin.getId() <= this.lastSentBulletinId) continue;
            arrayBuilder.add((JsonValue)this.serialize(builder, bulletin, platform, nodeId, allowNullValues));
        }
        JsonArray jsonArray = arrayBuilder.build();
        Transaction transaction = null;
        try {
            this.setup((PropertyContext)context);
            transaction = this.getClient().createTransaction(TransferDirection.SEND);
            if (transaction == null) {
                this.getLogger().info("All destination nodes are penalized; will attempt to send data later");
                return;
            }
            HashMap<String, String> attributes = new HashMap<String, String>();
            String transactionId = UUID.randomUUID().toString();
            attributes.put("reporting.task.transaction.id", transactionId);
            attributes.put("reporting.task.name", this.getName());
            attributes.put("reporting.task.uuid", this.getIdentifier());
            attributes.put("reporting.task.type", ((Object)((Object)this)).getClass().getSimpleName());
            attributes.put("mime.type", "application/json");
            this.sendData(context, transaction, attributes, jsonArray);
            transaction.confirm();
            transaction.complete();
            long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
            this.getLogger().info("Successfully sent {} Bulletins to destination in {} ms; Transaction ID = {}; First Event ID = {}", new Object[]{bulletins.size(), transferMillis, transactionId, ((Bulletin)bulletins.get(0)).getId()});
        }
        catch (Exception e) {
            if (transaction != null) {
                transaction.error();
            }
            if (e instanceof ProcessException) {
                throw (ProcessException)e;
            }
            throw new ProcessException("Failed to send Bulletins to destination due to IOException:" + e.getMessage(), (Throwable)e);
        }
        this.lastSentBulletinId = currMaxId;
    }

    private JsonObject serialize(JsonObjectBuilder builder, Bulletin bulletin, String platform, String nodeIdentifier, Boolean allowNullValues) {
        this.addField(builder, "objectId", UUID.randomUUID().toString(), (boolean)allowNullValues);
        this.addField(builder, "platform", platform, (boolean)allowNullValues);
        this.addField(builder, "bulletinId", bulletin.getId(), (boolean)allowNullValues);
        this.addField(builder, "bulletinCategory", bulletin.getCategory(), (boolean)allowNullValues);
        this.addField(builder, "bulletinGroupId", bulletin.getGroupId(), (boolean)allowNullValues);
        this.addField(builder, "bulletinGroupName", bulletin.getGroupName(), (boolean)allowNullValues);
        this.addField(builder, "bulletinGroupPath", bulletin.getGroupPath(), (boolean)allowNullValues);
        this.addField(builder, "bulletinLevel", bulletin.getLevel(), (boolean)allowNullValues);
        this.addField(builder, "bulletinMessage", bulletin.getMessage(), (boolean)allowNullValues);
        this.addField(builder, "bulletinNodeAddress", bulletin.getNodeAddress(), (boolean)allowNullValues);
        this.addField(builder, "bulletinNodeId", nodeIdentifier, (boolean)allowNullValues);
        this.addField(builder, "bulletinSourceId", bulletin.getSourceId(), (boolean)allowNullValues);
        this.addField(builder, "bulletinSourceName", bulletin.getSourceName(), (boolean)allowNullValues);
        this.addField(builder, "bulletinSourceType", bulletin.getSourceType() == null ? null : bulletin.getSourceType().name(), (boolean)allowNullValues);
        this.addField(builder, "bulletinTimestamp", DATE_TIME_FORMATTER.format(bulletin.getTimestamp().toInstant()), (boolean)allowNullValues);
        this.addField(builder, "bulletinFlowFileUuid", bulletin.getFlowFileUuid(), (boolean)allowNullValues);
        return builder.build();
    }
}

