/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.lambda.processor;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.breaker.CircuitBreaker;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.plugins.codec.json.JsonOutputCodecConfig;
import org.opensearch.dataprepper.plugins.lambda.common.LambdaCommonHandler;
import org.opensearch.dataprepper.plugins.lambda.common.ResponseEventHandlingStrategy;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer;
import org.opensearch.dataprepper.plugins.lambda.common.client.LambdaClientFactory;
import org.opensearch.dataprepper.plugins.lambda.common.config.ClientOptions;
import org.opensearch.dataprepper.plugins.lambda.processor.AggregateResponseEventHandlingStrategy;
import org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessorConfig;
import org.opensearch.dataprepper.plugins.lambda.processor.StrictResponseEventHandlingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
import software.amazon.awssdk.services.lambda.model.LambdaException;

@DataPrepperPlugin(name="aws_lambda", pluginType=Processor.class, pluginConfigurationType=LambdaProcessorConfig.class)
public class LambdaProcessor
extends AbstractProcessor<Record<Event>, Record<Event>> {
    public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS = "recordsSuccessfullySentToLambda";
    public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED = "recordsFailedToSentLambda";
    public static final String NUMBER_OF_SUCCESSFUL_REQUESTS_TO_LAMBDA = "numberOfRequestsSucceeded";
    public static final String NUMBER_OF_FAILED_REQUESTS_TO_LAMBDA = "numberOfRequestsFailed";
    public static final String LAMBDA_LATENCY_METRIC = "lambdaFunctionLatency";
    public static final String REQUEST_PAYLOAD_SIZE = "requestPayloadSize";
    public static final String RESPONSE_PAYLOAD_SIZE = "responsePayloadSize";
    public static final String LAMBDA_RESPONSE_RECORDS_COUNTER = "lambdaResponseRecordsCounter";
    public static final String RECORDS_EXCEEDING_THRESHOLD = "recordsExceedingThreshold";
    public static final String CIRCUIT_BREAKER_TRIPS = "circuitBreakerTrips";
    private static final byte[] NO_RETURN_RESPONSE = "null".getBytes(StandardCharsets.UTF_8);
    private static final String EXCEEDING_PAYLOAD_LIMIT_EXCEPTION = "Status Code: 413";
    private static final Logger LOG = LoggerFactory.getLogger(LambdaProcessor.class);
    final PluginSetting codecPluginSetting;
    final PluginFactory pluginFactory;
    final LambdaProcessorConfig lambdaProcessorConfig;
    private final String whenCondition;
    private final ExpressionEvaluator expressionEvaluator;
    private final Counter numberOfRecordsSuccessCounter;
    private final Counter numberOfRecordsFailedCounter;
    private final Counter numberOfRequestsSuccessCounter;
    private final Counter numberOfRequestsFailedCounter;
    private final Counter lambdaResponseRecordsCounter;
    private final Counter batchExceedingThresholdCounter;
    private final Timer lambdaLatencyMetric;
    private final List<String> tagsOnFailure;
    private final LambdaAsyncClient lambdaAsyncClient;
    private final DistributionSummary requestPayloadMetric;
    private final DistributionSummary responsePayloadMetric;
    private final ResponseEventHandlingStrategy responseStrategy;
    private final JsonOutputCodecConfig jsonOutputCodecConfig;
    private final CircuitBreaker circuitBreaker;

    @DataPrepperPluginConstructor
    public LambdaProcessor(PluginFactory pluginFactory, PluginSetting pluginSetting, LambdaProcessorConfig lambdaProcessorConfig, AwsCredentialsSupplier awsCredentialsSupplier, ExpressionEvaluator expressionEvaluator, CircuitBreaker circuitBreaker) {
        super(PluginMetrics.fromPluginSetting((PluginSetting)pluginSetting, (String)(pluginSetting.getName() + "_processor")));
        this.expressionEvaluator = expressionEvaluator;
        this.pluginFactory = pluginFactory;
        this.lambdaProcessorConfig = lambdaProcessorConfig;
        this.circuitBreaker = circuitBreaker;
        this.numberOfRecordsSuccessCounter = this.pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS);
        this.numberOfRecordsFailedCounter = this.pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED);
        this.numberOfRequestsSuccessCounter = this.pluginMetrics.counter(NUMBER_OF_SUCCESSFUL_REQUESTS_TO_LAMBDA);
        this.numberOfRequestsFailedCounter = this.pluginMetrics.counter(NUMBER_OF_FAILED_REQUESTS_TO_LAMBDA);
        this.lambdaLatencyMetric = this.pluginMetrics.timer(LAMBDA_LATENCY_METRIC);
        this.requestPayloadMetric = this.pluginMetrics.summary(REQUEST_PAYLOAD_SIZE);
        this.responsePayloadMetric = this.pluginMetrics.summary(RESPONSE_PAYLOAD_SIZE);
        this.lambdaResponseRecordsCounter = this.pluginMetrics.counter(LAMBDA_RESPONSE_RECORDS_COUNTER);
        this.batchExceedingThresholdCounter = this.pluginMetrics.counter(RECORDS_EXCEEDING_THRESHOLD);
        this.whenCondition = lambdaProcessorConfig.getWhenCondition();
        this.tagsOnFailure = lambdaProcessorConfig.getTagsOnFailure();
        PluginModel responseCodecConfig = lambdaProcessorConfig.getResponseCodecConfig();
        this.codecPluginSetting = responseCodecConfig == null ? new PluginSetting("json", Collections.emptyMap()) : new PluginSetting(responseCodecConfig.getPluginName(), responseCodecConfig.getPluginSettings());
        this.jsonOutputCodecConfig = new JsonOutputCodecConfig();
        this.jsonOutputCodecConfig.setKeyName(lambdaProcessorConfig.getBatchOptions().getKeyName());
        ClientOptions clientOptions = lambdaProcessorConfig.getClientOptions();
        if (clientOptions == null) {
            clientOptions = new ClientOptions();
        }
        this.lambdaAsyncClient = LambdaClientFactory.createAsyncLambdaClient(lambdaProcessorConfig.getAwsAuthenticationOptions(), awsCredentialsSupplier, clientOptions);
        this.responseStrategy = lambdaProcessorConfig.getResponseEventsMatch() != false ? new StrictResponseEventHandlingStrategy() : new AggregateResponseEventHandlingStrategy();
    }

    public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
        if (records.isEmpty()) {
            return records;
        }
        ArrayList<Record<Event>> resultRecords = new ArrayList<Record<Event>>();
        ArrayList<Record<Event>> recordsToLambda = new ArrayList<Record<Event>>();
        for (Record<Event> record : records) {
            Event event = (Event)record.getData();
            if (this.whenCondition != null && !this.expressionEvaluator.evaluateConditional(this.whenCondition, event).booleanValue()) {
                resultRecords.add(record);
                continue;
            }
            recordsToLambda.add(record);
        }
        Map<Object, Object> bufferToFutureMap = new HashMap();
        try {
            this.checkCircuitBreaker();
            bufferToFutureMap = LambdaCommonHandler.sendRecords(recordsToLambda, this.lambdaProcessorConfig, this.lambdaAsyncClient, new OutputCodecContext());
        }
        catch (Exception e) {
            LOG.error(DataPrepperMarkers.NOISY, "Error while batching and sending records to Lambda", (Throwable)e);
            this.numberOfRecordsFailedCounter.increment((double)recordsToLambda.size());
            this.numberOfRequestsFailedCounter.increment();
            resultRecords.addAll(this.addFailureTags(recordsToLambda));
        }
        for (Map.Entry<Object, Object> entry : bufferToFutureMap.entrySet()) {
            CompletableFuture future = (CompletableFuture)entry.getValue();
            Buffer inputBuffer = (Buffer)entry.getKey();
            try {
                InvokeResponse response = (InvokeResponse)future.join();
                Duration latency = inputBuffer.stopLatencyWatch();
                this.lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS);
                this.requestPayloadMetric.record((double)inputBuffer.getPayloadRequestSize().longValue());
                if (!LambdaCommonHandler.isSuccess(response)) {
                    String errorMessage = String.format("Lambda invoke failed with status code %s error %s ", response.statusCode(), response.payload().asUtf8String());
                    throw new RuntimeException(errorMessage);
                }
                resultRecords.addAll(this.convertLambdaResponseToEvent(inputBuffer, response));
                this.numberOfRecordsSuccessCounter.increment((double)inputBuffer.getEventCount());
                this.numberOfRequestsSuccessCounter.increment();
                if (response.payload() == null) continue;
                this.responsePayloadMetric.record((double)response.payload().asByteArray().length);
            }
            catch (Exception e) {
                LOG.error(DataPrepperMarkers.NOISY, e.getMessage(), (Throwable)e);
                if (e instanceof LambdaException && e.getMessage() != null && e.getMessage().contains(EXCEEDING_PAYLOAD_LIMIT_EXCEPTION)) {
                    this.batchExceedingThresholdCounter.increment();
                }
                this.numberOfRecordsFailedCounter.increment((double)inputBuffer.getEventCount());
                this.numberOfRequestsFailedCounter.increment();
                resultRecords.addAll(this.addFailureTags(inputBuffer.getRecords()));
            }
        }
        return resultRecords;
    }

    private void checkCircuitBreaker() {
        if (this.circuitBreaker != null && this.circuitBreaker.isOpen()) {
            int retries;
            LOG.warn("Circuit breaker is open. Will wait up to {} retries with {}ms interval before proceeding.", (Object)this.lambdaProcessorConfig.getCircuitBreakerRetries(), (Object)this.lambdaProcessorConfig.getCircuitBreakerWaitInterval());
            for (retries = 0; this.circuitBreaker.isOpen() && retries < this.lambdaProcessorConfig.getCircuitBreakerRetries(); ++retries) {
                try {
                    LOG.warn(DataPrepperMarkers.NOISY, "Circuit breaker is open,Retry count: {}/{}", (Object)(retries + 1), (Object)this.lambdaProcessorConfig.getCircuitBreakerRetries());
                    Thread.sleep(this.lambdaProcessorConfig.getCircuitBreakerWaitInterval());
                    continue;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LOG.warn("Interrupted while waiting for circuit breaker to close", (Throwable)e);
                    break;
                }
            }
            if (this.circuitBreaker.isOpen()) {
                LOG.warn("Proceeding with Lambda invocation after {} retries, even though circuit breaker is still open. This may lead to increased memory pressure.", (Object)retries);
            } else {
                LOG.info("Circuit breaker closed after {} retries. Resuming Lambda invocation.", (Object)retries);
            }
        }
    }

    List<Record<Event>> convertLambdaResponseToEvent(Buffer flushedBuffer, InvokeResponse lambdaResponse) throws IOException {
        InputCodec responseCodec = (InputCodec)this.pluginFactory.loadPlugin(InputCodec.class, this.codecPluginSetting, new Object[0]);
        List<Record<Event>> originalRecords = flushedBuffer.getRecords();
        ArrayList<Event> parsedEvents = new ArrayList<Event>();
        SdkBytes payload = lambdaResponse.payload();
        if (!Arrays.equals(NO_RETURN_RESPONSE, payload.asByteArrayUnsafe())) {
            InputStream inputStream = payload.asInputStream();
            responseCodec.parse(inputStream, record -> {
                Event event = (Event)record.getData();
                parsedEvents.add(event);
            });
        }
        LOG.debug("Parsed Event Size:{}, FlushedBuffer eventCount:{}, FlushedBuffer size:{}", new Object[]{parsedEvents.size(), flushedBuffer.getEventCount(), flushedBuffer.getSize()});
        this.lambdaResponseRecordsCounter.increment((double)parsedEvents.size());
        return this.responseStrategy.handleEvents(parsedEvents, originalRecords);
    }

    private List<Record<Event>> addFailureTags(List<Record<Event>> records) {
        if (this.tagsOnFailure == null || this.tagsOnFailure.isEmpty()) {
            return records;
        }
        for (Record<Event> record : records) {
            Event event = (Event)record.getData();
            EventMetadata metadata = event.getMetadata();
            if (metadata != null) {
                metadata.addTags(this.tagsOnFailure);
                continue;
            }
            LOG.warn("Event metadata is null, cannot add failure tags.");
        }
        return records;
    }

    public void prepareForShutdown() {
    }

    public boolean isReadyForShutdown() {
        return true;
    }

    public void shutdown() {
    }
}

