/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.kafka.processors.consumer.convert;

import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.kafka.processors.ConsumeKafka;
import org.apache.nifi.kafka.processors.common.KafkaUtils;
import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
import org.apache.nifi.kafka.processors.consumer.convert.KafkaMessageConverter;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
import org.apache.nifi.kafka.shared.property.KeyEncoding;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.provenance.ProvenanceReporter;

public class FlowFileStreamKafkaMessageConverter
implements KafkaMessageConverter {
    private final Charset headerEncoding;
    private final Pattern headerNamePattern;
    private final KeyEncoding keyEncoding;
    private final boolean commitOffsets;
    private final OffsetTracker offsetTracker;
    private final Runnable onSuccess;

    public FlowFileStreamKafkaMessageConverter(Charset headerEncoding, Pattern headerNamePattern, KeyEncoding keyEncoding, boolean commitOffsets, OffsetTracker offsetTracker, Runnable onSuccess) {
        this.headerEncoding = headerEncoding;
        this.headerNamePattern = headerNamePattern;
        this.keyEncoding = keyEncoding;
        this.commitOffsets = commitOffsets;
        this.offsetTracker = offsetTracker;
        this.onSuccess = onSuccess;
    }

    @Override
    public void toFlowFiles(ProcessSession session, Iterator<ByteRecord> consumerRecords) {
        while (consumerRecords.hasNext()) {
            ByteRecord consumerRecord = consumerRecords.next();
            byte[] value = consumerRecord.getValue();
            FlowFile flowFile = session.create();
            flowFile = session.write(flowFile, outputStream -> outputStream.write(value));
            Map<String, String> attributes = KafkaUtils.toAttributes(consumerRecord, this.keyEncoding, this.headerNamePattern, this.headerEncoding, this.commitOffsets);
            flowFile = session.putAllAttributes(flowFile, attributes);
            ProvenanceReporter provenanceReporter = session.getProvenanceReporter();
            String transitUri = String.format("kafka://%s/%s", consumerRecord.getTopic(), consumerRecord.getPartition());
            provenanceReporter.receive(flowFile, transitUri);
            session.adjustCounter("Records Received from " + consumerRecord.getTopic(), consumerRecord.getBundledCount(), false);
            session.transfer(flowFile, ConsumeKafka.SUCCESS);
            this.offsetTracker.update(consumerRecord);
        }
        this.onSuccess.run();
    }
}

