/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.s3;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.io.InputFile;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.source.s3.S3InputFile;
import org.opensearch.dataprepper.plugins.source.s3.S3ObjectHandler;
import org.opensearch.dataprepper.plugins.source.s3.S3ObjectPluginMetrics;
import org.opensearch.dataprepper.plugins.source.s3.S3ObjectReference;
import org.opensearch.dataprepper.plugins.source.s3.S3ObjectRequest;
import org.opensearch.dataprepper.plugins.source.s3.S3SourceProgressState;
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3DataSelection;
import org.opensearch.dataprepper.plugins.source.s3.ownership.BucketOwnerProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;

class S3ObjectWorker
implements S3ObjectHandler {
    private static final Logger LOG = LoggerFactory.getLogger(S3ObjectWorker.class);
    private static final long DEFAULT_CHECKPOINT_INTERVAL_MILLS = 300000L;
    private static final int MAX_RETRIES_DELETE_OBJECT = 3;
    private static final long DELETE_OBJECT_RETRY_DELAY_MS = 1000L;
    private static final String BUCKET_KEY = "bucket";
    private static final String KEY_KEY = "key";
    private static final String TIME_KEY = "time";
    private static final String LENGTH_KEY = "length";
    private final S3Client s3Client;
    private final Buffer<Record<Event>> buffer;
    private final CompressionOption compressionOption;
    private final InputCodec codec;
    private final BucketOwnerProvider bucketOwnerProvider;
    private final Duration bufferTimeout;
    private final int numberOfRecordsToAccumulate;
    private final BiConsumer<Event, S3ObjectReference> eventConsumer;
    private final S3ObjectPluginMetrics s3ObjectPluginMetrics;
    private Instant lastModified;

    public S3ObjectWorker(S3ObjectRequest s3ObjectRequest) {
        this.buffer = s3ObjectRequest.getBuffer();
        this.compressionOption = s3ObjectRequest.getCompressionOption();
        this.codec = s3ObjectRequest.getCodec();
        this.bucketOwnerProvider = s3ObjectRequest.getBucketOwnerProvider();
        this.bufferTimeout = s3ObjectRequest.getBufferTimeout();
        this.numberOfRecordsToAccumulate = s3ObjectRequest.getNumberOfRecordsToAccumulate();
        this.eventConsumer = s3ObjectRequest.getEventConsumer();
        this.s3Client = s3ObjectRequest.getS3Client();
        this.lastModified = Instant.now();
        this.s3ObjectPluginMetrics = s3ObjectRequest.getS3ObjectPluginMetrics();
    }

    @Override
    public void processS3Object(S3ObjectReference s3ObjectReference, S3DataSelection dataSelection, AcknowledgementSet acknowledgementSet, SourceCoordinator<S3SourceProgressState> sourceCoordinator, String partitionKey) throws IOException {
        BufferAccumulator bufferAccumulator = BufferAccumulator.create(this.buffer, (int)this.numberOfRecordsToAccumulate, (Duration)this.bufferTimeout);
        try {
            this.s3ObjectPluginMetrics.getS3ObjectReadTimer().recordCallable(() -> {
                this.doProcessObject(acknowledgementSet, s3ObjectReference, (BufferAccumulator<Record<Event>>)bufferAccumulator, sourceCoordinator, partitionKey, dataSelection);
                return null;
            });
        }
        catch (IllegalArgumentException e) {
            throw new IOException(e.getMessage());
        }
        catch (IOException | RuntimeException e) {
            throw e;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        this.s3ObjectPluginMetrics.getS3ObjectsSucceededCounter().increment();
    }

    @Override
    public void deleteS3Object(S3ObjectReference s3ObjectReference) {
        DeleteObjectRequest.Builder deleteRequestBuilder = DeleteObjectRequest.builder().bucket(s3ObjectReference.getBucketName()).key(s3ObjectReference.getKey());
        Optional<String> bucketOwner = this.bucketOwnerProvider.getBucketOwner(s3ObjectReference.getBucketName());
        bucketOwner.ifPresent(arg_0 -> ((DeleteObjectRequest.Builder)deleteRequestBuilder).expectedBucketOwner(arg_0));
        DeleteObjectRequest deleteObjectRequest = (DeleteObjectRequest)deleteRequestBuilder.build();
        boolean deleteSuccessFul = false;
        int retryCount = 0;
        while (!deleteSuccessFul && retryCount < 3) {
            try {
                this.s3Client.deleteObject(deleteObjectRequest);
                deleteSuccessFul = true;
                LOG.debug("Successfully deleted object {} from bucket {} on attempt {}", new Object[]{s3ObjectReference.getKey(), s3ObjectReference.getBucketName(), retryCount});
            }
            catch (Exception e) {
                if (++retryCount == 3) {
                    LOG.error("Failed to delete object {} from bucket {} after {} attempts: {}", new Object[]{s3ObjectReference.getKey(), s3ObjectReference.getBucketName(), 3, e.getMessage()});
                    this.s3ObjectPluginMetrics.getS3ObjectsDeleteFailed().increment();
                    continue;
                }
                LOG.warn("Failed to delete object {} from bucket {} on attempt {}, will retry: {}", new Object[]{s3ObjectReference.getKey(), s3ObjectReference.getBucketName(), retryCount, e.getMessage()});
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }

    public long consumeS3Object(S3InputFile inputFile, S3DataSelection dataSelection, BiConsumer<Record<Event>, S3DataSelection> consumer) throws Exception {
        S3ObjectReference s3ObjectReference = inputFile.getObjectReference();
        if (dataSelection == S3DataSelection.METADATA_ONLY) {
            HashMap<String, Object> data = new HashMap<String, Object>();
            data.put(BUCKET_KEY, s3ObjectReference.getBucketName());
            data.put(KEY_KEY, s3ObjectReference.getKey());
            data.put(TIME_KEY, inputFile.getLastModified());
            data.put(LENGTH_KEY, inputFile.getLength());
            JacksonEvent event = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(data).build();
            consumer.accept((Record<Event>)new Record((Object)event), S3DataSelection.METADATA_ONLY);
            return event.toJsonString().length();
        }
        CompressionOption fileCompressionOption = this.compressionOption != CompressionOption.AUTOMATIC ? this.compressionOption : CompressionOption.fromFileName((String)s3ObjectReference.getKey());
        this.codec.parse((InputFile)inputFile, fileCompressionOption.getDecompressionEngine(), record -> consumer.accept((Record<Event>)record, dataSelection));
        return inputFile.getLength();
    }

    private void doProcessObject(AcknowledgementSet acknowledgementSet, S3ObjectReference s3ObjectReference, BufferAccumulator<Record<Event>> bufferAccumulator, SourceCoordinator<S3SourceProgressState> sourceCoordinator, String partitionKey, S3DataSelection dataSelection) throws Exception {
        long s3ObjectSize;
        LOG.info("Read S3 object: {}", (Object)s3ObjectReference);
        AtomicLong lastCheckpointTime = new AtomicLong(System.currentTimeMillis());
        S3InputFile inputFile = new S3InputFile(this.s3Client, s3ObjectReference, this.bucketOwnerProvider, this.s3ObjectPluginMetrics);
        AtomicInteger saveStateCounter = new AtomicInteger();
        try {
            Instant lastModifiedTime = inputFile.getLastModified();
            Instant now = Instant.now();
            Instant originationTime = lastModifiedTime == null || lastModifiedTime.isAfter(now) ? now : lastModifiedTime;
            s3ObjectSize = this.consumeS3Object(inputFile, dataSelection, (record, objectDataSelection) -> {
                try {
                    Event event = (Event)record.getData();
                    if (this.eventConsumer != null && objectDataSelection == S3DataSelection.DATA_AND_METADATA) {
                        this.eventConsumer.accept(event, s3ObjectReference);
                    }
                    event.getMetadata().setExternalOriginationTime(originationTime);
                    event.getEventHandle().setExternalOriginationTime(originationTime);
                    if (acknowledgementSet != null) {
                        acknowledgementSet.add(event);
                    }
                    bufferAccumulator.add(record);
                    if (acknowledgementSet != null && sourceCoordinator != null && partitionKey != null && System.currentTimeMillis() - lastCheckpointTime.get() > 300000L) {
                        LOG.debug("Renew partition ownership for the object {}", (Object)partitionKey);
                        sourceCoordinator.saveProgressStateForPartition(partitionKey, null);
                        lastCheckpointTime.set(System.currentTimeMillis());
                        saveStateCounter.getAndIncrement();
                    }
                }
                catch (Exception e) {
                    LOG.error("Failed writing S3 objects to buffer due to: {}", (Object)e.getMessage());
                }
            });
        }
        catch (Exception ex) {
            this.s3ObjectPluginMetrics.getS3ObjectsFailedCounter().increment();
            LOG.error("Error reading from S3 object: s3ObjectReference={}. {}", (Object)s3ObjectReference, (Object)ex.getMessage());
            throw ex;
        }
        try {
            bufferAccumulator.flush();
        }
        catch (Exception e) {
            LOG.error("Failed writing S3 objects to buffer.", (Throwable)e);
        }
        int recordsWritten = bufferAccumulator.getTotalWritten();
        if (recordsWritten == 0) {
            LOG.warn("Failed to find any records in S3 object: s3ObjectReference={}.", (Object)s3ObjectReference);
            this.s3ObjectPluginMetrics.getS3ObjectNoRecordsFound().increment();
        }
        this.s3ObjectPluginMetrics.getS3ObjectSizeSummary().record((double)s3ObjectSize);
        this.s3ObjectPluginMetrics.getS3ObjectEventsSummary().record((double)recordsWritten);
    }
}

