/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.kinesis.source.processor;

import java.time.Duration;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.converter.KinesisRecordConverter;
import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisCheckpointerTracker;
import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class KinesisShardRecordProcessorFactory
implements ShardRecordProcessorFactory {
    private final Buffer<Record<Event>> buffer;
    private final KinesisSourceConfig kinesisSourceConfig;
    private final AcknowledgementSetManager acknowledgementSetManager;
    private final PluginMetrics pluginMetrics;
    private final KinesisRecordConverter kinesisRecordConverter;

    public KinesisShardRecordProcessorFactory(Buffer<Record<Event>> buffer, KinesisSourceConfig kinesisSourceConfig, AcknowledgementSetManager acknowledgementSetManager, PluginMetrics pluginMetrics, InputCodec codec) {
        this.kinesisSourceConfig = kinesisSourceConfig;
        this.buffer = buffer;
        this.acknowledgementSetManager = acknowledgementSetManager;
        this.pluginMetrics = pluginMetrics;
        this.kinesisRecordConverter = new KinesisRecordConverter(codec);
    }

    public ShardRecordProcessor shardRecordProcessor() {
        throw new UnsupportedOperationException("Use the method with stream details!");
    }

    public ShardRecordProcessor shardRecordProcessor(StreamIdentifier streamIdentifier) {
        BufferAccumulator bufferAccumulator = BufferAccumulator.create(this.buffer, (int)this.kinesisSourceConfig.getNumberOfRecordsToAccumulate(), (Duration)this.kinesisSourceConfig.getBufferTimeout());
        KinesisCheckpointerTracker kinesisCheckpointerTracker = new KinesisCheckpointerTracker();
        return new KinesisRecordProcessor((BufferAccumulator<Record<Event>>)bufferAccumulator, this.kinesisSourceConfig, this.acknowledgementSetManager, this.pluginMetrics, this.kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier);
    }
}

