/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.reporting.sink;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.migration.ProxyServiceMigration;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.s2s.SiteToSiteUtils;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;

@Tags(value={"db", "s2s", "site", "record"})
@CapabilityDescription(value="Provides a service to write records using a configured RecordSetWriter over a Site-to-Site connection.")
public class SiteToSiteReportingRecordSink
extends AbstractControllerService
implements RecordSinkService {
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(RECORD_WRITER_FACTORY, SiteToSiteUtils.DESTINATION_URL, SiteToSiteUtils.PORT_NAME, SiteToSiteUtils.SSL_CONTEXT, SiteToSiteUtils.INSTANCE_URL, SiteToSiteUtils.COMPRESS, SiteToSiteUtils.TIMEOUT, SiteToSiteUtils.BATCH_SIZE, SiteToSiteUtils.TRANSPORT_PROTOCOL, SiteToSiteUtils.PROXY_CONFIGURATION_SERVICE);
    private volatile SiteToSiteClient siteToSiteClient;
    private volatile RecordSetWriterFactory writerFactory;
    private volatile StateManager stateManager;

    protected void init(ControllerServiceInitializationContext context) {
        this.stateManager = context.getStateManager();
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public void migrateProperties(PropertyConfiguration config) {
        ProxyServiceMigration.migrateProxyProperties((PropertyConfiguration)config, (PropertyDescriptor)SiteToSiteUtils.PROXY_CONFIGURATION_SERVICE, (String)"s2s-http-proxy-hostname", (String)"s2s-http-proxy-port", (String)"s2s-http-proxy-username", (String)"s2s-http-proxy-password");
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext context) throws InitializationException {
        try {
            ComponentLog logger = this.getLogger();
            this.siteToSiteClient = SiteToSiteUtils.getClient((PropertyContext)context, logger, this.stateManager);
            this.writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
        }
        catch (Exception e) {
            throw new InitializationException((Throwable)e);
        }
    }

    public WriteResult sendData(RecordSet recordSet, Map<String, String> attributes, boolean sendZeroResults) throws IOException {
        Transaction transaction = null;
        try {
            WriteResult writeResult = null;
            transaction = this.getClient().createTransaction(TransferDirection.SEND);
            if (transaction == null) {
                this.getLogger().info("All destination nodes are penalized; will attempt to send data later");
            } else {
                RecordSchema writeSchema = this.getWriterFactory().getSchema(null, recordSet.getSchema());
                ByteArrayOutputStream out = new ByteArrayOutputStream();
                int recordCount = 0;
                try (RecordSetWriter writer = this.getWriterFactory().createWriter(this.getLogger(), writeSchema, (OutputStream)out, attributes);){
                    Record record;
                    writer.beginRecordSet();
                    while ((record = recordSet.next()) != null) {
                        writer.write(record);
                    }
                    writeResult = writer.finishRecordSet();
                    recordCount = writeResult.getRecordCount();
                    attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                    attributes.put("record.count", Integer.toString(recordCount));
                    attributes.putAll(writeResult.getAttributes());
                }
                if (recordCount > 0 || sendZeroResults) {
                    transaction.send(out.toByteArray(), attributes);
                    transaction.confirm();
                    transaction.complete();
                }
            }
            return writeResult;
        }
        catch (Exception e) {
            if (transaction != null) {
                transaction.error();
            }
            if (e instanceof IOException) {
                throw (IOException)e;
            }
            throw new IOException("Failed to write metrics using record writer: " + e.getMessage(), e);
        }
    }

    @OnDisabled
    public void stop() throws IOException {
        SiteToSiteClient client = this.getClient();
        if (client != null) {
            client.close();
            this.siteToSiteClient = null;
        }
    }

    protected SiteToSiteClient getClient() {
        return this.siteToSiteClient;
    }

    protected RecordSetWriterFactory getWriterFactory() {
        return this.writerFactory;
    }
}

