/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.kafka.processors.consumer.convert;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.kafka.processors.ConsumeKafka;
import org.apache.nifi.kafka.processors.common.KafkaUtils;
import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
import org.apache.nifi.kafka.processors.consumer.convert.KafkaMessageConverter;
import org.apache.nifi.kafka.service.api.header.RecordHeader;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
import org.apache.nifi.kafka.shared.property.KeyEncoding;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

public class RecordStreamKafkaMessageConverter
implements KafkaMessageConverter {
    private static final RecordSchema EMPTY_SCHEMA = new SimpleRecordSchema(List.of());
    private final RecordReaderFactory readerFactory;
    private final RecordSetWriterFactory writerFactory;
    private final Charset headerEncoding;
    private final Pattern headerNamePattern;
    private final KeyEncoding keyEncoding;
    private final boolean commitOffsets;
    private final OffsetTracker offsetTracker;
    private final Runnable onSuccess;
    private final ComponentLog logger;

    public RecordStreamKafkaMessageConverter(RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, Charset headerEncoding, Pattern headerNamePattern, KeyEncoding keyEncoding, boolean commitOffsets, OffsetTracker offsetTracker, Runnable onSuccess, ComponentLog logger) {
        this.readerFactory = readerFactory;
        this.writerFactory = writerFactory;
        this.headerEncoding = headerEncoding;
        this.headerNamePattern = headerNamePattern;
        this.keyEncoding = keyEncoding;
        this.commitOffsets = commitOffsets;
        this.offsetTracker = offsetTracker;
        this.onSuccess = onSuccess;
        this.logger = logger;
    }

    @Override
    public void toFlowFiles(ProcessSession session, Iterator<ByteRecord> consumerRecords) {
        try {
            HashMap<RecordGroupCriteria, RecordGroup> recordGroups = new HashMap<RecordGroupCriteria, RecordGroup>();
            String topic = null;
            int partition = 0;
            while (consumerRecords.hasNext()) {
                ByteRecord consumerRecord = consumerRecords.next();
                if (topic == null) {
                    partition = consumerRecord.getPartition();
                    topic = consumerRecord.getTopic();
                }
                byte[] value = consumerRecord.getValue();
                Map<String, String> headers = this.getRelevantHeaders(consumerRecord, this.headerNamePattern);
                Map<String, String> attributes = KafkaUtils.toAttributes(consumerRecord, this.keyEncoding, this.headerNamePattern, this.headerEncoding, this.commitOffsets);
                try (ByteArrayInputStream in = new ByteArrayInputStream(value);
                     RecordReader valueRecordReader = this.readerFactory.createRecordReader(attributes, (InputStream)in, (long)value.length, this.logger);){
                    int recordCount = 0;
                    while (true) {
                        Record record = valueRecordReader.nextRecord();
                        if (recordCount++ > 0 && record == null) {
                            break;
                        }
                        RecordSchema recordSchema = record == null ? EMPTY_SCHEMA : record.getSchema();
                        RecordSchema writeSchema = this.writerFactory.getSchema(attributes, recordSchema);
                        RecordGroupCriteria groupCriteria = new RecordGroupCriteria(writeSchema, headers);
                        RecordGroup recordGroup = (RecordGroup)recordGroups.get(groupCriteria);
                        if (recordGroup == null) {
                            RecordSetWriter writer;
                            FlowFile flowFile = session.create();
                            Map<String, String> groupAttributes = Map.of("kafka.topic", consumerRecord.getTopic(), "kafka.partition", Long.toString(consumerRecord.getPartition()));
                            flowFile = session.putAllAttributes(flowFile, groupAttributes);
                            OutputStream out2 = session.write(flowFile);
                            try {
                                writer = this.writerFactory.createWriter(this.logger, writeSchema, out2, attributes);
                                writer.beginRecordSet();
                            }
                            catch (Exception e) {
                                out2.close();
                                throw e;
                            }
                            recordGroup = new RecordGroup(flowFile, writer, topic, partition);
                            recordGroups.put(groupCriteria, recordGroup);
                        }
                        if (record == null) continue;
                        recordGroup.writer().write(record);
                    }
                }
                catch (MalformedRecordException e) {
                    FlowFile flowFile = session.create();
                    flowFile = session.putAllAttributes(flowFile, attributes);
                    flowFile = session.write(flowFile, out -> out.write(value));
                    session.transfer(flowFile, ConsumeKafka.PARSE_FAILURE);
                    session.adjustCounter("Records Received from " + consumerRecord.getTopic(), 1L, false);
                    this.offsetTracker.update(consumerRecord);
                    continue;
                }
                this.offsetTracker.update(consumerRecord);
            }
            for (Map.Entry entry : recordGroups.entrySet()) {
                int recordCount;
                HashMap<String, String> attributes;
                RecordGroupCriteria criteria = (RecordGroupCriteria)entry.getKey();
                RecordGroup recordGroup = (RecordGroup)entry.getValue();
                try (RecordSetWriter writer = recordGroup.writer();){
                    WriteResult writeResult = writer.finishRecordSet();
                    attributes = new HashMap<String, String>(writeResult.getAttributes());
                    attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
                    attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                    attributes.putAll(criteria.headers());
                    attributes.put("kafka.consumer.offsets.committed", String.valueOf(this.commitOffsets));
                    recordCount = writeResult.getRecordCount();
                }
                FlowFile flowFile = recordGroup.flowFile();
                flowFile = session.putAllAttributes(flowFile, attributes);
                ProvenanceReporter provenanceReporter = session.getProvenanceReporter();
                String transitUri = String.format("kafka://%s/%s", topic, partition);
                provenanceReporter.receive(flowFile, transitUri);
                session.transfer(flowFile, ConsumeKafka.SUCCESS);
                session.adjustCounter("Records Received from " + topic, (long)recordCount, false);
            }
            this.onSuccess.run();
        }
        catch (IOException | SchemaNotFoundException e) {
            throw new ProcessException("FlowFile Record conversion failed", e);
        }
    }

    private Map<String, String> getRelevantHeaders(ByteRecord consumerRecord, Pattern headerNamePattern) {
        if (headerNamePattern == null || consumerRecord == null) {
            return Map.of();
        }
        HashMap<String, String> headers = new HashMap<String, String>();
        for (RecordHeader header : consumerRecord.getHeaders()) {
            String name = header.key();
            if (!headerNamePattern.matcher(name).matches()) continue;
            String value = new String(header.value(), this.headerEncoding);
            headers.put(name, value);
        }
        return headers;
    }

    private record RecordGroupCriteria(RecordSchema schema, Map<String, String> headers) {
    }

    private record RecordGroup(FlowFile flowFile, RecordSetWriter writer, String topic, int partition) {
    }
}

