/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.kafka.processors.consumer.bundle;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.nifi.kafka.processors.common.KafkaUtils;
import org.apache.nifi.kafka.processors.consumer.bundle.BundleKey;
import org.apache.nifi.kafka.processors.consumer.bundle.BundleValue;
import org.apache.nifi.kafka.service.api.common.TopicPartitionSummary;
import org.apache.nifi.kafka.service.api.header.RecordHeader;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
import org.apache.nifi.kafka.shared.property.KeyEncoding;

public class ByteRecordBundler {
    private final byte[] demarcator;
    private final boolean separateByKey;
    private final KeyEncoding keyEncoding;
    private final Pattern headerNamePattern;
    private final Charset headerEncoding;
    private final boolean commitOffsets;
    private final Map<BundleKey, BundleValue> bundles;

    public ByteRecordBundler(byte[] demarcator, boolean separateByKey, KeyEncoding keyEncoding, Pattern headerNamePattern, Charset headerEncoding, boolean commitOffsets) {
        this.demarcator = demarcator;
        this.separateByKey = separateByKey;
        this.keyEncoding = keyEncoding;
        this.headerNamePattern = headerNamePattern;
        this.headerEncoding = headerEncoding;
        this.commitOffsets = commitOffsets;
        this.bundles = new HashMap<BundleKey, BundleValue>();
    }

    public Iterator<ByteRecord> bundle(Iterator<ByteRecord> consumerRecords) {
        while (consumerRecords.hasNext()) {
            this.update(this.bundles, consumerRecords.next());
        }
        return this.bundles.entrySet().stream().map(e -> this.toByteRecord((BundleKey)e.getKey(), (BundleValue)e.getValue())).iterator();
    }

    private ByteRecord toByteRecord(BundleKey key, BundleValue value) {
        TopicPartitionSummary topicPartition = key.getTopicPartition();
        key.headers.add(new RecordHeader("kafka.max.offset", Long.toString(value.getLastOffset()).getBytes(StandardCharsets.UTF_8)));
        key.headers.add(new RecordHeader("kafka.count", Long.toString(value.getCount()).getBytes(StandardCharsets.UTF_8)));
        return new ByteRecord(topicPartition.getTopic(), topicPartition.getPartition(), value.getFirstOffset(), key.getTimestamp(), key.getHeaders(), key.getMessageKey(), value.getData(), value.getCount());
    }

    private void update(Map<BundleKey, BundleValue> bundles, ByteRecord byteRecord) {
        TopicPartitionSummary topicPartition = new TopicPartitionSummary(byteRecord.getTopic(), byteRecord.getPartition());
        List headers = byteRecord.getHeaders();
        List<RecordHeader> headersFiltered = KafkaUtils.toHeadersFiltered(byteRecord, this.headerNamePattern);
        byte[] messageKey = this.separateByKey ? (byte[])byteRecord.getKey().orElse(null) : null;
        Map<String, String> attributes = KafkaUtils.toAttributes(byteRecord, this.keyEncoding, this.headerNamePattern, this.headerEncoding, this.commitOffsets);
        BundleKey bundleKey = new BundleKey(topicPartition, byteRecord.getTimestamp(), headers, headersFiltered, attributes, messageKey);
        if (bundles.containsKey(bundleKey)) {
            this.update(bundles, byteRecord, bundleKey);
        } else {
            this.create(bundles, byteRecord, bundleKey);
        }
    }

    private void update(Map<BundleKey, BundleValue> bundles, ByteRecord byteRecord, BundleKey bundleKey) {
        BundleValue bundleValue = bundles.get(bundleKey);
        bundleValue.update(this.demarcator, byteRecord.getValue(), byteRecord.getOffset());
    }

    private void create(Map<BundleKey, BundleValue> bundles, ByteRecord byteRecord, BundleKey bundleKey) {
        BundleValue bundleValue = new BundleValue(byteRecord.getOffset());
        bundleValue.update(this.demarcator, byteRecord.getValue(), byteRecord.getOffset());
        bundles.put(bundleKey, bundleValue);
    }
}

