/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.buffer.blockingbuffer;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.AbstractBuffer;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBufferConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name="bounded_blocking", pluginType=Buffer.class, pluginConfigurationType=BlockingBufferConfig.class)
public class BlockingBuffer<T extends Record<?>>
extends AbstractBuffer<T> {
    private static final Logger LOG = LoggerFactory.getLogger(BlockingBuffer.class);
    private static final String PLUGIN_NAME = "bounded_blocking";
    private static final String ATTRIBUTE_BUFFER_CAPACITY = "buffer_size";
    private static final String ATTRIBUTE_BATCH_SIZE = "batch_size";
    private static final String BLOCKING_BUFFER = "BlockingBuffer";
    private static final String BUFFER_USAGE_METRIC = "bufferUsage";
    public static final String CAPACITY_USED_METRIC = "capacityUsed";
    private final int bufferCapacity;
    private final int batchSize;
    private final BlockingQueue<T> blockingQueue;
    private final String pipelineName;
    private final Semaphore capacitySemaphore;

    public BlockingBuffer(int bufferCapacity, int batchSize, String pipelineName) {
        super(BLOCKING_BUFFER, pipelineName);
        this.bufferCapacity = bufferCapacity;
        this.batchSize = batchSize;
        this.blockingQueue = new LinkedBlockingQueue<T>(bufferCapacity);
        this.capacitySemaphore = new Semaphore(bufferCapacity);
        this.pipelineName = pipelineName;
        PluginMetrics pluginMetrics = PluginMetrics.fromNames((String)BLOCKING_BUFFER, (String)pipelineName);
        pluginMetrics.gauge(CAPACITY_USED_METRIC, (Object)this.capacitySemaphore, capacity -> bufferCapacity - capacity.availablePermits());
        pluginMetrics.gauge(BUFFER_USAGE_METRIC, (Object)this.capacitySemaphore, capacity -> ((double)bufferCapacity - (double)capacity.availablePermits()) / (double)bufferCapacity * 100.0);
    }

    @DataPrepperPluginConstructor
    public BlockingBuffer(BlockingBufferConfig blockingBufferConfig, PipelineDescription pipelineDescription) {
        this(((BlockingBufferConfig)Preconditions.checkNotNull((Object)blockingBufferConfig, (Object)"BlockingBufferConfig cannot be null")).getBufferSize(), blockingBufferConfig.getBatchSize(), pipelineDescription.getPipelineName());
    }

    public BlockingBuffer(String pipelineName) {
        this(12800, 200, pipelineName);
    }

    public void doWrite(T record, int timeoutInMillis) throws TimeoutException {
        try {
            boolean permitAcquired = this.capacitySemaphore.tryAcquire(timeoutInMillis, TimeUnit.MILLISECONDS);
            if (!permitAcquired) {
                throw new TimeoutException(String.format("Pipeline [%s] - Buffer is full, timed out waiting for a slot", this.pipelineName));
            }
            this.blockingQueue.offer(record);
        }
        catch (InterruptedException ex) {
            LOG.error("Pipeline [{}] - Buffer is full, interrupted while waiting to write the record", (Object)this.pipelineName, (Object)ex);
            throw new TimeoutException("Buffer is full, timed out waiting for a slot");
        }
    }

    public void doWriteAll(Collection<T> records, int timeoutInMillis) throws Exception {
        int size = records.size();
        if (size > this.bufferCapacity) {
            throw new SizeOverflowException(String.format("Buffer capacity too small for the number of records: %d", size));
        }
        try {
            boolean permitAcquired = this.capacitySemaphore.tryAcquire(size, timeoutInMillis, TimeUnit.MILLISECONDS);
            if (!permitAcquired) {
                throw new TimeoutException(String.format("Pipeline [%s] - Buffer does not have enough capacity left for the number of records: %d, timed out waiting for slots.", this.pipelineName, size));
            }
            this.blockingQueue.addAll(records);
        }
        catch (InterruptedException ex) {
            LOG.error("Pipeline [{}] - Buffer does not have enough capacity left for the number of records: {}, interrupted while waiting to write the records", new Object[]{this.pipelineName, size, ex});
            throw new TimeoutException(String.format("Pipeline [%s] - Buffer does not have enough capacity left for the number of records: %d, timed out waiting for slots.", this.pipelineName, size));
        }
    }

    public Map.Entry<Collection<T>, CheckpointState> doRead(int timeoutInMillis) {
        ArrayList<T> records = new ArrayList<T>(this.batchSize);
        int recordsRead = 0;
        if (timeoutInMillis == 0) {
            T record = this.pollForBufferEntry(5, TimeUnit.MILLISECONDS);
            if (record != null) {
                records.add(record);
                ++recordsRead;
            }
            recordsRead += this.blockingQueue.drainTo(records, this.batchSize - 1);
        } else {
            Stopwatch stopwatch = Stopwatch.createStarted();
            while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < (long)timeoutInMillis && records.size() < this.batchSize) {
                T record = this.pollForBufferEntry(timeoutInMillis, TimeUnit.MILLISECONDS);
                if (record != null) {
                    records.add(record);
                    ++recordsRead;
                }
                if (recordsRead >= this.batchSize) continue;
                recordsRead += this.blockingQueue.drainTo(records, this.batchSize - recordsRead);
            }
        }
        this.updateLatency(records);
        CheckpointState checkpointState = new CheckpointState(recordsRead);
        return new AbstractMap.SimpleEntry<Collection<T>, CheckpointState>(records, checkpointState);
    }

    private T pollForBufferEntry(int timeoutValue, TimeUnit timeoutUnit) {
        try {
            return (T)((Record)this.blockingQueue.poll(timeoutValue, timeoutUnit));
        }
        catch (InterruptedException e) {
            LOG.info("Pipeline [{}] - Interrupt received while reading from buffer", (Object)this.pipelineName);
            throw new RuntimeException(e);
        }
    }

    public static PluginSetting getDefaultPluginSettings() {
        HashMap<String, Integer> settings = new HashMap<String, Integer>();
        settings.put(ATTRIBUTE_BUFFER_CAPACITY, 12800);
        settings.put(ATTRIBUTE_BATCH_SIZE, 200);
        return new PluginSetting(PLUGIN_NAME, settings);
    }

    public void doCheckpoint(CheckpointState checkpointState) {
        int numCheckedRecords = checkpointState.getNumRecordsToBeChecked();
        this.capacitySemaphore.release(numCheckedRecords);
    }

    public boolean isEmpty() {
        return this.blockingQueue.isEmpty() && this.getRecordsInFlight() == 0;
    }
}

