/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.common.sink;

import java.util.Collection;
import java.util.List;
import org.opensearch.dataprepper.common.sink.LockStrategy;
import org.opensearch.dataprepper.common.sink.SinkBuffer;
import org.opensearch.dataprepper.common.sink.SinkBufferEntry;
import org.opensearch.dataprepper.common.sink.SinkBufferEntryProvider;
import org.opensearch.dataprepper.common.sink.SinkDlqHandler;
import org.opensearch.dataprepper.common.sink.SinkFlushContext;
import org.opensearch.dataprepper.common.sink.SinkFlushResult;
import org.opensearch.dataprepper.common.sink.SinkFlushableBuffer;
import org.opensearch.dataprepper.common.sink.SinkMetrics;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DefaultSinkOutputStrategy
implements SinkBufferEntryProvider,
SinkDlqHandler {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultSinkOutputStrategy.class);
    private final LockStrategy lockStrategy;
    private final SinkBuffer sinkBuffer;
    private final SinkMetrics sinkMetrics;
    private final SinkFlushContext sinkFlushContext;

    public DefaultSinkOutputStrategy(LockStrategy lockStrategy, SinkBuffer sinkBuffer, SinkFlushContext sinkFlushContext, SinkMetrics sinkMetrics) {
        this.lockStrategy = lockStrategy;
        this.sinkBuffer = sinkBuffer;
        this.sinkMetrics = sinkMetrics;
        this.sinkFlushContext = sinkFlushContext;
    }

    public void flushBuffer() {
        long startTime = System.nanoTime();
        SinkFlushableBuffer flushableBuffer = this.sinkBuffer.getFlushableBuffer(this.sinkFlushContext);
        List<Event> events = flushableBuffer.getEvents();
        try {
            SinkFlushResult flushResult = flushableBuffer.flush();
            if (flushResult == null) {
                this.sinkMetrics.recordRequestLatency(System.nanoTime() - startTime);
                for (Event event : events) {
                    event.getEventHandle().release(true);
                }
            } else {
                this.addFailedEventsToDlq(flushResult.getEvents(), flushResult.getException(), flushResult.getStatusCode());
            }
        }
        catch (Exception e) {
            this.sinkMetrics.incrementRequestsFailedCounter(1);
            this.sinkMetrics.incrementEventsFailedCounter(events.size());
            this.addFailedEventsToDlq(events, e, 0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(Collection<Record<Event>> records) {
        this.lockStrategy.lock();
        try {
            if (this.sinkBuffer.exceedsFlushTimeInterval()) {
                this.flushBuffer();
            }
            if (records == null || records.isEmpty()) {
                return;
            }
            for (Record<Event> record : records) {
                Event event = (Event)record.getData();
                try {
                    SinkBufferEntry bufferEntry = this.getSinkBufferEntry(event);
                    if (bufferEntry.exceedsMaxEventSizeThreshold()) {
                        throw new RuntimeException("Event size exceeds max allowed event size");
                    }
                    if (this.sinkBuffer.willExceedMaxRequestSizeBytes(bufferEntry)) {
                        this.flushBuffer();
                    }
                    if (!this.sinkBuffer.addToBuffer(bufferEntry)) {
                        throw new RuntimeException("Failed to add event to sink buffer");
                    }
                    if (!this.sinkBuffer.isMaxEventsLimitReached()) continue;
                    this.flushBuffer();
                }
                catch (Exception ex) {
                    LOG.warn(DataPrepperMarkers.NOISY, "Failed process the event ", (Throwable)ex);
                    this.addFailedEventsToDlq(List.of(event), ex, 0);
                }
            }
        }
        finally {
            this.flushDlqList();
            this.lockStrategy.unlock();
        }
    }
}

