/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.lambda.sink;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.failures.DlqObject;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.AbstractSink;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.lambda.common.LambdaCommonHandler;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferSynchronized;
import org.opensearch.dataprepper.plugins.lambda.common.client.LambdaClientFactory;
import org.opensearch.dataprepper.plugins.lambda.common.config.ClientOptions;
import org.opensearch.dataprepper.plugins.lambda.common.util.ThresholdCheck;
import org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkConfig;
import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler;
import org.opensearch.dataprepper.plugins.lambda.sink.dlq.LambdaSinkFailedDlqData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;

@DataPrepperPlugin(name="aws_lambda", pluginType=Sink.class, pluginConfigurationType=LambdaSinkConfig.class)
public class LambdaSink
extends AbstractSink<Record<Event>> {
    public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS = "recordsSuccessfullySentToLambda";
    public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED = "recordsFailedToSentLambda";
    public static final String NUMBER_OF_SUCCESSFUL_REQUESTS_TO_LAMBDA = "numberOfRequestsSucceeded";
    public static final String NUMBER_OF_FAILED_REQUESTS_TO_LAMBDA = "numberOfRequestsFailed";
    public static final String LAMBDA_LATENCY_METRIC = "lambdaFunctionLatency";
    public static final String REQUEST_PAYLOAD_SIZE = "requestPayloadSize";
    public static final String RESPONSE_PAYLOAD_SIZE = "responsePayloadSize";
    private static final Logger LOG = LoggerFactory.getLogger(LambdaSink.class);
    private static final String BUCKET = "bucket";
    private static final String KEY_PATH = "key_path_prefix";
    private final Counter numberOfRecordsSuccessCounter;
    private final Counter numberOfRecordsFailedCounter;
    private final Counter numberOfRequestsSuccessCounter;
    private final Counter numberOfRequestsFailedCounter;
    private final LambdaSinkConfig lambdaSinkConfig;
    private final ExpressionEvaluator expressionEvaluator;
    private final LambdaAsyncClient lambdaAsyncClient;
    private final DistributionSummary responsePayloadMetric;
    private final Timer lambdaLatencyMetric;
    private final DistributionSummary requestPayloadMetric;
    private final PluginSetting pluginSetting;
    private final OutputCodecContext outputCodecContext;
    private volatile boolean sinkInitialized;
    private DlqPushHandler dlqPushHandler = null;
    final int maxEvents;
    final ByteCount maxBytes;
    final Duration maxCollectTime;
    private Buffer statefulBuffer;
    private ReentrantLock reentrantLock;

    @DataPrepperPluginConstructor
    public LambdaSink(PluginSetting pluginSetting, LambdaSinkConfig lambdaSinkConfig, PluginFactory pluginFactory, SinkContext sinkContext, AwsCredentialsSupplier awsCredentialsSupplier, ExpressionEvaluator expressionEvaluator) {
        super(pluginSetting);
        this.pluginSetting = pluginSetting;
        this.sinkInitialized = Boolean.FALSE;
        this.lambdaSinkConfig = lambdaSinkConfig;
        this.expressionEvaluator = expressionEvaluator;
        this.outputCodecContext = OutputCodecContext.fromSinkContext((SinkContext)sinkContext);
        this.maxEvents = lambdaSinkConfig.getBatchOptions().getThresholdOptions().getEventCount();
        this.maxBytes = lambdaSinkConfig.getBatchOptions().getThresholdOptions().getMaximumSize();
        this.maxCollectTime = lambdaSinkConfig.getBatchOptions().getThresholdOptions().getEventCollectTimeOut();
        this.numberOfRecordsSuccessCounter = this.pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS);
        this.numberOfRecordsFailedCounter = this.pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED);
        this.numberOfRequestsSuccessCounter = this.pluginMetrics.counter(NUMBER_OF_SUCCESSFUL_REQUESTS_TO_LAMBDA);
        this.numberOfRequestsFailedCounter = this.pluginMetrics.counter(NUMBER_OF_FAILED_REQUESTS_TO_LAMBDA);
        this.lambdaLatencyMetric = this.pluginMetrics.timer(LAMBDA_LATENCY_METRIC);
        this.requestPayloadMetric = this.pluginMetrics.summary(REQUEST_PAYLOAD_SIZE);
        this.responsePayloadMetric = this.pluginMetrics.summary(RESPONSE_PAYLOAD_SIZE);
        ClientOptions clientOptions = lambdaSinkConfig.getClientOptions();
        if (clientOptions == null) {
            clientOptions = new ClientOptions();
        }
        this.lambdaAsyncClient = LambdaClientFactory.createAsyncLambdaClient(lambdaSinkConfig.getAwsAuthenticationOptions(), awsCredentialsSupplier, clientOptions);
        if (lambdaSinkConfig.getDlqPluginSetting() != null) {
            this.dlqPushHandler = new DlqPushHandler(pluginFactory, pluginSetting, lambdaSinkConfig.getDlq(), lambdaSinkConfig.getAwsAuthenticationOptions());
        }
        this.reentrantLock = new ReentrantLock();
    }

    public boolean isReady() {
        return this.sinkInitialized;
    }

    public void doInitialize() {
        try {
            this.doInitializeInternal();
        }
        catch (InvalidPluginConfigurationException e) {
            LOG.error("Invalid plugin configuration, Hence failed to initialize s3-sink plugin.");
            this.shutdown();
            throw e;
        }
        catch (Exception e) {
            LOG.error("Failed to initialize lambda plugin.");
            this.shutdown();
            throw e;
        }
    }

    private void doInitializeInternal() {
        this.statefulBuffer = new InMemoryBufferSynchronized(this.lambdaSinkConfig.getBatchOptions().getKeyName(), this.outputCodecContext);
        this.sinkInitialized = Boolean.TRUE;
    }

    public void shutdown() {
        if (this.statefulBuffer.getEventCount() > 0) {
            this.flushBuffers(Collections.singletonList(this.statefulBuffer));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void doOutput(Collection<Record<Event>> records) {
        if (!this.sinkInitialized) {
            LOG.warn("LambdaSink doOutput called before initialization");
            return;
        }
        this.reentrantLock.lock();
        try {
            if (this.statefulBuffer.getEventCount() > 0 && ThresholdCheck.checkTimeoutExceeded(this.statefulBuffer, this.maxCollectTime)) {
                LOG.debug("Flushing partial buffer due to timeout of {}", (Object)this.maxCollectTime);
                Buffer bufferToFlush = this.statefulBuffer;
                this.statefulBuffer = new InMemoryBufferSynchronized(this.lambdaSinkConfig.getBatchOptions().getKeyName(), this.outputCodecContext);
                this.flushBuffers(Collections.singletonList(bufferToFlush));
            }
            ArrayList<Buffer> fullBuffers = new ArrayList<Buffer>();
            for (Record<Event> record : records) {
                if (ThresholdCheck.checkSizeThresholdExceed(this.statefulBuffer, this.maxBytes, record) || ThresholdCheck.checkTimeoutExceeded(this.statefulBuffer, this.maxCollectTime)) {
                    fullBuffers.add(this.statefulBuffer);
                    this.statefulBuffer = new InMemoryBufferSynchronized(this.lambdaSinkConfig.getBatchOptions().getKeyName());
                }
                this.statefulBuffer.addRecord(record);
                if (!ThresholdCheck.checkEventCountThresholdExceeded(this.statefulBuffer, this.maxEvents)) continue;
                fullBuffers.add(this.statefulBuffer);
                this.statefulBuffer = new InMemoryBufferSynchronized(this.lambdaSinkConfig.getBatchOptions().getKeyName(), this.outputCodecContext);
            }
            if (!fullBuffers.isEmpty()) {
                this.flushBuffers(fullBuffers);
            }
        }
        finally {
            this.reentrantLock.unlock();
        }
    }

    private DlqObject createDlqObjectFromEvent(Event event, String functionName, int status, String message) {
        return DlqObject.builder().withEventHandle(event.getEventHandle()).withFailedData((Object)LambdaSinkFailedDlqData.builder().withData(event.toJsonString()).withStatus(status).withFunctionName(functionName).withMessage(message).build()).withPluginName(this.pluginSetting.getName()).withPipelineName(this.pluginSetting.getPipelineName()).withPluginId(this.pluginSetting.getName()).build();
    }

    void handleFailure(Collection<Record<Event>> failedRecords, Throwable throwable, int statusCode) {
        if (failedRecords.isEmpty()) {
            return;
        }
        this.numberOfRecordsFailedCounter.increment((double)failedRecords.size());
        this.numberOfRequestsFailedCounter.increment();
        if (this.dlqPushHandler == null) {
            this.releaseEventHandles(failedRecords, false);
        }
        try {
            ArrayList<DlqObject> dlqObjects = new ArrayList<DlqObject>();
            for (Record<Event> record : failedRecords) {
                if (record.getData() == null) continue;
                dlqObjects.add(this.createDlqObjectFromEvent((Event)record.getData(), this.lambdaSinkConfig.getFunctionName(), statusCode, throwable.getMessage()));
            }
            this.dlqPushHandler.perform(dlqObjects);
            this.releaseEventHandles(failedRecords, true);
        }
        catch (Exception ex) {
            LOG.error("Exception occured during error handling");
            this.releaseEventHandles(failedRecords, false);
        }
    }

    private void releaseEventHandles(Collection<Record<Event>> records, boolean success) {
        for (Record<Event> record : records) {
            EventHandle eventHandle;
            Event event = (Event)record.getData();
            if (event == null || (eventHandle = event.getEventHandle()) == null) continue;
            eventHandle.release(success);
        }
    }

    void flushBuffers(List<Buffer> buffersToFlush) {
        Map<Buffer, CompletableFuture<InvokeResponse>> bufferToFutureMap;
        try {
            bufferToFutureMap = LambdaCommonHandler.invokeLambdaAndGetFutureMap(this.lambdaSinkConfig, this.lambdaAsyncClient, buffersToFlush);
        }
        catch (Exception e) {
            LOG.error(DataPrepperMarkers.NOISY, "Error sending buffers to Lambda", (Throwable)e);
            ArrayList<Record<Event>> combinedRecords = new ArrayList<Record<Event>>();
            for (Buffer buf : buffersToFlush) {
                combinedRecords.addAll(buf.getRecords());
            }
            this.handleFailure(combinedRecords, e, 500);
            return;
        }
        for (Map.Entry<Buffer, CompletableFuture<InvokeResponse>> entry : bufferToFutureMap.entrySet()) {
            Buffer inputBuffer = entry.getKey();
            CompletableFuture<InvokeResponse> future = entry.getValue();
            try {
                InvokeResponse response = future.join();
                Duration latency = inputBuffer.stopLatencyWatch();
                this.lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS);
                this.requestPayloadMetric.record((double)inputBuffer.getPayloadRequestSize().longValue());
                if (!LambdaCommonHandler.isSuccess(response)) {
                    String errorMsg = String.format("Lambda invoke failed with code %d, error: %s", response.statusCode(), response.payload() != null ? response.payload().asUtf8String() : "No payload");
                    throw new RuntimeException(errorMsg);
                }
                this.releaseEventHandles(inputBuffer.getRecords(), true);
                this.numberOfRecordsSuccessCounter.increment((double)inputBuffer.getEventCount());
                this.numberOfRequestsSuccessCounter.increment();
                if (response.payload() == null) continue;
                this.responsePayloadMetric.record((double)response.payload().asByteArray().length);
            }
            catch (Exception ex) {
                LOG.error(DataPrepperMarkers.NOISY, "Error handling future response from Lambda", (Throwable)ex);
                this.handleFailure(inputBuffer.getRecords(), ex, 500);
            }
        }
    }

    private boolean isThresholdExceeded(Buffer buffer, Record record) {
        return ThresholdCheck.checkEventCountThresholdExceeded(buffer, this.maxEvents) || ThresholdCheck.checkSizeThresholdExceed(buffer, this.maxBytes, (Record<Event>)record);
    }
}

