/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.kafka.processors.consumer.convert;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.kafka.processors.ConsumeKafka;
import org.apache.nifi.kafka.processors.common.KafkaUtils;
import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
import org.apache.nifi.kafka.processors.consumer.convert.KafkaMessageConverter;
import org.apache.nifi.kafka.processors.consumer.wrapper.ConsumeWrapperRecord;
import org.apache.nifi.kafka.processors.consumer.wrapper.WrapperRecordKeyReader;
import org.apache.nifi.kafka.processors.producer.wrapper.WrapperRecord;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
import org.apache.nifi.kafka.shared.property.KeyEncoding;
import org.apache.nifi.kafka.shared.property.KeyFormat;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.Tuple;

public class WrapperRecordStreamKafkaMessageConverter
implements KafkaMessageConverter {
    private static final RecordSchema EMPTY_SCHEMA = new SimpleRecordSchema(List.of());
    private final RecordReaderFactory readerFactory;
    private final RecordSetWriterFactory writerFactory;
    private final RecordReaderFactory keyReaderFactory;
    private final Charset headerEncoding;
    private final Pattern headerNamePattern;
    private final KeyFormat keyFormat;
    private final KeyEncoding keyEncoding;
    private final boolean commitOffsets;
    private final OffsetTracker offsetTracker;
    private final Runnable onSuccess;
    private final ComponentLog logger;

    public WrapperRecordStreamKafkaMessageConverter(RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, RecordReaderFactory keyReaderFactory, Charset headerEncoding, Pattern headerNamePattern, KeyFormat keyFormat, KeyEncoding keyEncoding, boolean commitOffsets, OffsetTracker offsetTracker, Runnable onSuccess, ComponentLog logger) {
        this.readerFactory = readerFactory;
        this.writerFactory = writerFactory;
        this.keyReaderFactory = keyReaderFactory;
        this.headerEncoding = headerEncoding;
        this.headerNamePattern = headerNamePattern;
        this.keyFormat = keyFormat;
        this.keyEncoding = keyEncoding;
        this.commitOffsets = commitOffsets;
        this.offsetTracker = offsetTracker;
        this.onSuccess = onSuccess;
        this.logger = logger;
    }

    @Override
    public void toFlowFiles(ProcessSession session, Iterator<ByteRecord> consumerRecords) {
        try {
            FlowFile flowFile;
            Map<String, String> attributes;
            HashMap<RecordSchema, RecordGroup> recordGroups = new HashMap<RecordSchema, RecordGroup>();
            String topic = null;
            int partition = 0;
            while (consumerRecords.hasNext()) {
                ByteRecord consumerRecord = consumerRecords.next();
                if (topic == null) {
                    partition = consumerRecord.getPartition();
                    topic = consumerRecord.getTopic();
                }
                byte[] value = consumerRecord.getValue();
                attributes = KafkaUtils.toAttributes(consumerRecord, this.keyEncoding, this.headerNamePattern, this.headerEncoding, this.commitOffsets);
                try (ByteArrayInputStream in = new ByteArrayInputStream(value);
                     RecordReader valueRecordReader = this.readerFactory.createRecordReader(attributes, (InputStream)in, (long)value.length, this.logger);){
                    int recordCount = 0;
                    while (true) {
                        Record record = valueRecordReader.nextRecord();
                        if (recordCount++ > 0 && record == null) {
                            break;
                        }
                        WrapperRecordKeyReader keyReader = new WrapperRecordKeyReader(this.keyFormat, this.keyReaderFactory, this.keyEncoding, this.logger);
                        Tuple<RecordField, Object> recordKey = keyReader.toWrapperRecordKey(consumerRecord.getKey().orElse(null), attributes);
                        RecordSchema recordSchema = record == null ? EMPTY_SCHEMA : record.getSchema();
                        RecordSchema fullSchema = WrapperRecord.toWrapperSchema((RecordField)recordKey.getKey(), recordSchema);
                        RecordSchema writeSchema = this.writerFactory.getSchema(attributes, fullSchema);
                        RecordGroup recordGroup = (RecordGroup)recordGroups.get(writeSchema);
                        if (recordGroup == null) {
                            RecordSetWriter writer;
                            FlowFile flowFile2 = session.create();
                            Map<String, String> groupAttributes = Map.of("kafka.topic", consumerRecord.getTopic(), "kafka.partition", Long.toString(consumerRecord.getPartition()));
                            flowFile2 = session.putAllAttributes(flowFile2, groupAttributes);
                            OutputStream out2 = session.write(flowFile2);
                            try {
                                writer = this.writerFactory.createWriter(this.logger, writeSchema, out2, attributes);
                                writer.beginRecordSet();
                            }
                            catch (Exception e) {
                                out2.close();
                                throw e;
                            }
                            recordGroup = new RecordGroup(flowFile2, writer, topic, partition);
                            recordGroups.put(writeSchema, recordGroup);
                        }
                        ConsumeWrapperRecord consumeWrapperRecord = new ConsumeWrapperRecord(this.headerEncoding);
                        MapRecord wrapperRecord = consumeWrapperRecord.toWrapperRecord(consumerRecord, record, recordKey);
                        recordGroup.writer().write((Record)wrapperRecord);
                    }
                }
                catch (MalformedRecordException e) {
                    flowFile = session.create();
                    flowFile = session.putAllAttributes(flowFile, attributes);
                    flowFile = session.write(flowFile, out -> out.write(value));
                    session.transfer(flowFile, ConsumeKafka.PARSE_FAILURE);
                    session.adjustCounter("Records Received from " + consumerRecord.getTopic(), 1L, false);
                    this.offsetTracker.update(consumerRecord);
                    continue;
                }
                this.offsetTracker.update(consumerRecord);
            }
            for (RecordGroup recordGroup : recordGroups.values()) {
                int recordCount;
                try (RecordSetWriter writer = recordGroup.writer();){
                    WriteResult writeResult = writer.finishRecordSet();
                    attributes = new HashMap<String, String>(writeResult.getAttributes());
                    attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
                    attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                    attributes.put("kafka.consumer.offsets.committed", String.valueOf(this.commitOffsets));
                    recordCount = writeResult.getRecordCount();
                }
                flowFile = recordGroup.flowFile();
                flowFile = session.putAllAttributes(flowFile, attributes);
                ProvenanceReporter provenanceReporter = session.getProvenanceReporter();
                String transitUri = String.format("kafka://%s/%s", topic, partition);
                provenanceReporter.receive(flowFile, transitUri);
                session.adjustCounter("Records Received from " + topic, (long)recordCount, false);
                session.transfer(flowFile, ConsumeKafka.SUCCESS);
            }
            this.onSuccess.run();
        }
        catch (IOException | SchemaNotFoundException e) {
            throw new ProcessException("FlowFile Record conversion failed", e);
        }
    }

    private record RecordGroup(FlowFile flowFile, RecordSetWriter writer, String topic, int partition) {
    }
}

