/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.buffer.common;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.NotThreadSafe;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class BufferAccumulator<T extends Record<?>> {
    private static final Logger LOG = LoggerFactory.getLogger(BufferAccumulator.class);
    private static final int MAX_FLUSH_RETRIES_ON_IO_EXCEPTION = Integer.MAX_VALUE;
    private static final Duration INITIAL_FLUSH_RETRY_DELAY_ON_IO_EXCEPTION = Duration.ofSeconds(5L);
    private final Buffer<T> buffer;
    private final int numberOfRecordsToAccumulate;
    private final int bufferTimeoutMillis;
    private int totalWritten = 0;
    private final Collection<T> recordsAccumulated;

    private BufferAccumulator(Buffer<T> buffer, int numberOfRecordsToAccumulate, Duration bufferTimeout) {
        this.buffer = Objects.requireNonNull(buffer, "buffer must be non-null.");
        this.numberOfRecordsToAccumulate = numberOfRecordsToAccumulate;
        Objects.requireNonNull(bufferTimeout, "bufferTimeout must be non-null.");
        this.bufferTimeoutMillis = (int)bufferTimeout.toMillis();
        if (numberOfRecordsToAccumulate < 1) {
            throw new IllegalArgumentException("numberOfRecordsToAccumulate must be greater than zero.");
        }
        this.recordsAccumulated = new ArrayList<T>(numberOfRecordsToAccumulate);
    }

    public static <T extends Record<?>> BufferAccumulator<T> create(Buffer<T> buffer, int recordsToAccumulate, Duration bufferTimeout) {
        return new BufferAccumulator<T>(buffer, recordsToAccumulate, bufferTimeout);
    }

    public void add(T record) throws Exception {
        this.recordsAccumulated.add(record);
        if (this.recordsAccumulated.size() >= this.numberOfRecordsToAccumulate) {
            this.flush();
        }
    }

    public void flush() throws Exception {
        try {
            LOG.debug("Flushing buffer accumulator");
            this.flushAccumulatedToBuffer();
        }
        catch (TimeoutException timeoutException) {
            this.flushWithBackoff();
        }
    }

    private boolean flushWithBackoff() throws Exception {
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        long nextDelay = INITIAL_FLUSH_RETRY_DELAY_ON_IO_EXCEPTION.toMillis();
        for (int retryCount = 0; retryCount < Integer.MAX_VALUE; ++retryCount) {
            LOG.debug("Retrying buffer flush on retry count {}", (Object)retryCount);
            ScheduledFuture<Boolean> flushBufferFuture = scheduledExecutorService.schedule(() -> {
                try {
                    this.flushAccumulatedToBuffer();
                    return true;
                }
                catch (TimeoutException e) {
                    LOG.debug("Timed out retrying buffer accumulator");
                    return false;
                }
            }, nextDelay, TimeUnit.MILLISECONDS);
            try {
                boolean flushedSuccessfully = (Boolean)flushBufferFuture.get();
                if (!flushedSuccessfully) continue;
                LOG.info("Successfully flushed the buffer accumulator on retry attempt {}", (Object)(retryCount + 1));
                scheduledExecutorService.shutdownNow();
                return true;
            }
            catch (ExecutionException e) {
                LOG.warn("Retrying of flushing the buffer accumulator hit an exception: {}", (Object)e.getMessage());
                scheduledExecutorService.shutdownNow();
                throw e;
            }
            catch (InterruptedException e) {
                LOG.warn("Retrying of flushing the buffer accumulator was interrupted: {}", (Object)e.getMessage());
                scheduledExecutorService.shutdownNow();
                throw e;
            }
        }
        LOG.warn("Flushing the bufferAccumulator failed after {} attempts", (Object)Integer.MAX_VALUE);
        scheduledExecutorService.shutdownNow();
        return false;
    }

    private void flushAccumulatedToBuffer() throws Exception {
        int currentRecordCountAccumulated = this.recordsAccumulated.size();
        if (currentRecordCountAccumulated > 0) {
            this.buffer.writeAll(this.recordsAccumulated, this.bufferTimeoutMillis);
            this.recordsAccumulated.clear();
            this.totalWritten += currentRecordCountAccumulated;
        }
    }

    public int getTotalWritten() {
        return this.totalWritten;
    }
}

