/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.dlq.s3;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.util.StringUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.DataPrepperVersion;
import org.opensearch.dataprepper.model.failures.DlqObject;
import org.opensearch.dataprepper.plugins.dlq.DlqWriter;
import org.opensearch.dataprepper.plugins.dlq.s3.KeyPathGenerator;
import org.opensearch.dataprepper.plugins.dlq.s3.S3DlqWriterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

public class S3DlqWriter
implements DlqWriter {
    static final String S3_DLQ_RECORDS_SUCCESS = "dlqS3RecordsSuccess";
    static final String S3_DLQ_RECORDS_FAILED = "dlqS3RecordsFailed";
    static final String S3_DLQ_REQUEST_SUCCESS = "dlqS3RequestSuccess";
    static final String S3_DLQ_REQUEST_FAILED = "dlqS3RequestFailed";
    static final String S3_DLQ_REQUEST_LATENCY = "dlqS3RequestLatency";
    static final String S3_DLQ_REQUEST_SIZE_BYTES = "dlqS3RequestSizeBytes";
    static final String DLQ_OBJECTS = "dlqObjects";
    private static final String KEY_NAME_FORMAT = "dlq-v%s-%s-%s-%s-%s.json";
    private static final String FULL_KEY_FORMAT = "%s%s";
    private static final Logger LOG = LoggerFactory.getLogger(S3DlqWriter.class);
    private final S3Client s3Client;
    private final String bucket;
    private final String keyPathPrefix;
    private final String bucketOwner;
    private final ObjectMapper objectMapper;
    private final Counter dlqS3RecordsSuccessCounter;
    private final Counter dlqS3RecordsFailedCounter;
    private final Counter dlqS3RequestSuccessCounter;
    private final Counter dlqS3RequestFailedCounter;
    private final Timer dlqS3RequestTimer;
    private final DistributionSummary dlqS3RequestSizeBytesSummary;
    private final KeyPathGenerator keyPathGenerator;

    S3DlqWriter(S3DlqWriterConfig s3DlqWriterConfig, ObjectMapper objectMapper, PluginMetrics pluginMetrics) {
        this.dlqS3RecordsSuccessCounter = pluginMetrics.counter(S3_DLQ_RECORDS_SUCCESS);
        this.dlqS3RecordsFailedCounter = pluginMetrics.counter(S3_DLQ_RECORDS_FAILED);
        this.dlqS3RequestSuccessCounter = pluginMetrics.counter(S3_DLQ_REQUEST_SUCCESS);
        this.dlqS3RequestFailedCounter = pluginMetrics.counter(S3_DLQ_REQUEST_FAILED);
        this.dlqS3RequestTimer = pluginMetrics.timer(S3_DLQ_REQUEST_LATENCY);
        this.dlqS3RequestSizeBytesSummary = pluginMetrics.summary(S3_DLQ_REQUEST_SIZE_BYTES);
        this.s3Client = s3DlqWriterConfig.getS3Client();
        Objects.requireNonNull(s3DlqWriterConfig.getBucket());
        this.bucket = s3DlqWriterConfig.getBucket();
        this.keyPathPrefix = StringUtils.isEmpty((String)s3DlqWriterConfig.getKeyPathPrefix()) ? s3DlqWriterConfig.getKeyPathPrefix() : this.enforceDefaultDelimiterOnKeyPathPrefix(s3DlqWriterConfig.getKeyPathPrefix());
        this.objectMapper = objectMapper;
        this.keyPathGenerator = new KeyPathGenerator(this.keyPathPrefix);
        this.bucketOwner = s3DlqWriterConfig.getBucketOwner();
    }

    @Override
    public void write(List<DlqObject> dlqObjects, String pipelineName, String pluginId) throws IOException {
        if (dlqObjects.isEmpty()) {
            return;
        }
        try {
            this.doWrite(dlqObjects, pipelineName, pluginId);
            this.dlqS3RequestSuccessCounter.increment();
            this.dlqS3RecordsSuccessCounter.increment((double)dlqObjects.size());
        }
        catch (Exception e) {
            this.dlqS3RequestFailedCounter.increment();
            this.dlqS3RecordsFailedCounter.increment((double)dlqObjects.size());
            throw e;
        }
    }

    private void doWrite(List<DlqObject> dlqObjects, String pipelineName, String pluginId) throws IOException {
        String content;
        PutObjectRequest putObjectRequest = (PutObjectRequest)PutObjectRequest.builder().bucket(this.bucket).expectedBucketOwner(this.bucketOwner).key(this.buildKey(pipelineName, pluginId)).build();
        PutObjectResponse response = this.timedPutObject(putObjectRequest, content = this.deserialize(dlqObjects));
        if (!response.sdkHttpResponse().isSuccessful()) {
            LOG.error(DataPrepperMarkers.SENSITIVE, "Failed to write content [{}] to S3 dlq", (Object)content);
            LOG.error("Failed to write to S3 dlq due to status code: [{}]", (Object)response.sdkHttpResponse().statusCode());
            throw new IOException(String.format("Failed to write to S3 dlq due to status code: %d", response.sdkHttpResponse().statusCode()));
        }
    }

    private PutObjectResponse timedPutObject(PutObjectRequest putObjectRequest, String content) throws IOException {
        try {
            return (PutObjectResponse)this.dlqS3RequestTimer.recordCallable(() -> this.putObject(putObjectRequest, content));
        }
        catch (IOException ioException) {
            throw ioException;
        }
        catch (Exception ex) {
            LOG.error(DataPrepperMarkers.SENSITIVE, "Failed timed write to S3 dlq with content: [{}]", (Object)content, (Object)ex);
            throw new IOException("Failed timed write to S3 dlq.", ex);
        }
    }

    private PutObjectResponse putObject(PutObjectRequest request, String content) throws IOException {
        try {
            return this.s3Client.putObject(request, RequestBody.fromString((String)content));
        }
        catch (Exception ex) {
            LOG.error(DataPrepperMarkers.SENSITIVE, "Failed to write content [{}] to S3 dlq", (Object)content, (Object)ex);
            throw new IOException("Failed to write to S3 dlq.", ex);
        }
    }

    private String deserialize(List<DlqObject> dlqObjects) throws IOException {
        try {
            Map<String, List<DlqObject>> output = Map.of(DLQ_OBJECTS, dlqObjects);
            String content = this.objectMapper.writeValueAsString(output);
            this.dlqS3RequestSizeBytesSummary.record((double)content.getBytes(StandardCharsets.UTF_8).length);
            return content;
        }
        catch (JsonProcessingException e) {
            LOG.error(DataPrepperMarkers.SENSITIVE, "Failed to build valid S3 request body with dlqObjects: [{}]", dlqObjects, (Object)e);
            throw new IOException("Failed to build valid S3 request body", e);
        }
    }

    private String buildKey(String pipelineName, String pluginId) {
        String key = String.format(KEY_NAME_FORMAT, DataPrepperVersion.getCurrentVersion().getMajorVersion(), pipelineName, pluginId, Instant.now(), UUID.randomUUID());
        return this.keyPathPrefix == null ? key : String.format(FULL_KEY_FORMAT, this.keyPathGenerator.generate(), key);
    }

    private String enforceDefaultDelimiterOnKeyPathPrefix(String keyPathPrefix) {
        return keyPathPrefix.charAt(keyPathPrefix.length() - 1) == '/' ? keyPathPrefix : keyPathPrefix.concat("/");
    }

    @Override
    public void close() throws IOException {
        this.s3Client.close();
    }
}

