/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.kafka.processors.producer.convert;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.nifi.kafka.processors.producer.common.ProducerUtils;
import org.apache.nifi.kafka.processors.producer.convert.KafkaRecordConverter;
import org.apache.nifi.kafka.processors.producer.header.HeadersFactory;
import org.apache.nifi.kafka.processors.producer.key.KeyFactory;
import org.apache.nifi.kafka.processors.producer.value.RecordValueFactory;
import org.apache.nifi.kafka.service.api.header.RecordHeader;
import org.apache.nifi.kafka.service.api.record.KafkaRecord;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;

public class RecordStreamKafkaRecordConverter
implements KafkaRecordConverter {
    private final RecordReaderFactory readerFactory;
    private final RecordSetWriterFactory writerFactory;
    private final HeadersFactory headersFactory;
    private final KeyFactory keyFactory;
    private final int maxMessageSize;
    private final ComponentLog logger;

    public RecordStreamKafkaRecordConverter(RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, HeadersFactory headersFactory, KeyFactory keyFactory, int maxMessageSize, ComponentLog logger) {
        this.readerFactory = readerFactory;
        this.writerFactory = writerFactory;
        this.headersFactory = headersFactory;
        this.keyFactory = keyFactory;
        this.maxMessageSize = maxMessageSize;
        this.logger = logger;
    }

    @Override
    public Iterator<KafkaRecord> convert(Map<String, String> attributes, InputStream in, long inputLength) throws IOException {
        try {
            RecordReader reader = this.readerFactory.createRecordReader(attributes, in, inputLength, this.logger);
            RecordSet recordSet = reader.createRecordSet();
            RecordSchema schema = this.writerFactory.getSchema(attributes, recordSet.getSchema());
            ByteArrayOutputStream os = new ByteArrayOutputStream();
            RecordSetWriter writer = this.writerFactory.createWriter(this.logger, schema, (OutputStream)os, attributes);
            PushBackRecordSet pushBackRecordSet = new PushBackRecordSet(recordSet);
            return this.toKafkaRecordIterator(attributes, os, writer, pushBackRecordSet);
        }
        catch (SchemaNotFoundException | MalformedRecordException e) {
            throw new IOException("Stream to Record conversion failed", e);
        }
    }

    private Iterator<KafkaRecord> toKafkaRecordIterator(final Map<String, String> attributes, ByteArrayOutputStream os, RecordSetWriter writer, final PushBackRecordSet pushBackRecordSet) throws IOException {
        final RecordValueFactory valueFactory = new RecordValueFactory(os, writer);
        final List<RecordHeader> headers = this.headersFactory.getHeaders(attributes);
        return new Iterator<KafkaRecord>(){

            @Override
            public boolean hasNext() {
                try {
                    return pushBackRecordSet.isAnotherRecord();
                }
                catch (IOException e) {
                    throw new UncheckedIOException("Record evaluation failed", e);
                }
            }

            @Override
            public KafkaRecord next() {
                try {
                    Record record = pushBackRecordSet.next();
                    byte[] key = RecordStreamKafkaRecordConverter.this.keyFactory.getKey(attributes, record);
                    byte[] value = valueFactory.getValue(record);
                    ProducerUtils.checkMessageSize(RecordStreamKafkaRecordConverter.this.maxMessageSize, value.length);
                    return new KafkaRecord(null, null, null, key, value, headers);
                }
                catch (IOException e) {
                    throw new UncheckedIOException("Record conversion failed", e);
                }
            }
        };
    }
}

