/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.kinesis.source.converter;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisInputOutputRecord;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

public class KinesisRecordConverter {
    private final InputCodec codec;

    public KinesisRecordConverter(InputCodec codec) {
        this.codec = codec;
    }

    public List<KinesisInputOutputRecord> convert(DecompressionEngine decompressionEngine, List<KinesisClientRecord> kinesisClientRecords, String streamName) throws IOException {
        ArrayList<KinesisInputOutputRecord> records = new ArrayList<KinesisInputOutputRecord>();
        for (KinesisClientRecord kinesisClientRecord : kinesisClientRecords) {
            this.processRecord(decompressionEngine, kinesisClientRecord, record -> {
                Event event = (Event)record.getData();
                EventMetadata eventMetadata = event.getMetadata();
                eventMetadata.setAttribute("stream_name", (Object)streamName.toLowerCase());
                eventMetadata.setAttribute("partition_key", (Object)kinesisClientRecord.partitionKey());
                eventMetadata.setAttribute("sequence_number", (Object)kinesisClientRecord.sequenceNumber());
                eventMetadata.setAttribute("sub_sequence_number", (Object)kinesisClientRecord.subSequenceNumber());
                Instant externalOriginationTime = kinesisClientRecord.approximateArrivalTimestamp();
                event.getEventHandle().setExternalOriginationTime(externalOriginationTime);
                event.getMetadata().setExternalOriginationTime(externalOriginationTime);
                records.add(KinesisInputOutputRecord.builder().withIncomingRecordSizeBytes(kinesisClientRecord.data().position()).withDataPrepperRecord((Record<Event>)record).build());
            });
        }
        return records;
    }

    private void processRecord(DecompressionEngine decompressionEngine, KinesisClientRecord record, Consumer<Record<Event>> eventConsumer) throws IOException {
        byte[] arr = new byte[record.data().remaining()];
        record.data().get(arr);
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arr);
        this.codec.parse(decompressionEngine.createInputStream((InputStream)byteArrayInputStream), eventConsumer);
    }
}

