/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.jms.processors.ioconcept.reader.record;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.processors.ioconcept.reader.MessageHandler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;

public class RecordSupplier {
    private final RecordReaderFactory readerFactory;
    private final RecordSetWriterFactory writerFactory;

    public RecordSupplier(RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory) {
        this.readerFactory = readerFactory;
        this.writerFactory = writerFactory;
    }

    public void process(FlowFile flowfile, InputStream in, AtomicInteger processedRecords, Long processFromIndex, ComponentLog logger, MessageHandler messageHandler) throws IOException {
        try (RecordReader reader = this.readerFactory.createRecordReader(flowfile, in, logger);){
            Record record;
            RecordSet recordSet = reader.createRecordSet();
            RecordSchema schema = this.writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema());
            ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
            while ((record = recordSet.next()) != null) {
                if (processFromIndex != null && (long)processedRecords.get() < processFromIndex) {
                    processedRecords.getAndIncrement();
                    continue;
                }
                baos.reset();
                try (RecordSetWriter writer = this.writerFactory.createWriter(logger, schema, (OutputStream)baos, flowfile);){
                    writer.write(record);
                    writer.flush();
                }
                byte[] messageContent = baos.toByteArray();
                messageHandler.handle(messageContent);
                processedRecords.getAndIncrement();
            }
        }
        catch (SchemaNotFoundException | MalformedRecordException e) {
            throw new ProcessException("An error happened during creating components for serialization.", e);
        }
    }
}

