/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.rds.export;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import java.io.InputStream;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHelper;
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType;
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHelper;
import org.opensearch.dataprepper.plugins.source.rds.export.S3ObjectReader;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataFileLoader
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(DataFileLoader.class);
    static final Duration VERSION_OVERLAP_TIME_FOR_EXPORT = Duration.ofMinutes(5L);
    static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60L);
    static final int DEFAULT_BUFFER_BATCH_SIZE = 1000;
    static final String EXPORT_RECORDS_TOTAL_COUNT = "exportRecordsTotal";
    static final String EXPORT_RECORDS_PROCESSED_COUNT = "exportRecordsProcessed";
    static final String EXPORT_RECORDS_PROCESSING_ERROR_COUNT = "exportRecordsProcessingErrors";
    static final String BYTES_RECEIVED = "bytesReceived";
    static final String BYTES_PROCESSED = "bytesProcessed";
    private final DataFilePartition dataFilePartition;
    private final String bucket;
    private final String objectKey;
    private final S3ObjectReader objectReader;
    private final InputCodec codec;
    private final Buffer<Record<Event>> buffer;
    private final ExportRecordConverter recordConverter;
    private final EnhancedSourceCoordinator sourceCoordinator;
    private final AcknowledgementSet acknowledgementSet;
    private final Duration acknowledgmentTimeout;
    private final DbTableMetadata dbTableMetadata;
    private final Counter exportRecordsTotalCounter;
    private final Counter exportRecordSuccessCounter;
    private final Counter exportRecordErrorCounter;
    private final DistributionSummary bytesReceivedSummary;
    private final DistributionSummary bytesProcessedSummary;

    private DataFileLoader(DataFilePartition dataFilePartition, InputCodec codec, Buffer<Record<Event>> buffer, S3ObjectReader objectReader, ExportRecordConverter recordConverter, PluginMetrics pluginMetrics, EnhancedSourceCoordinator sourceCoordinator, AcknowledgementSet acknowledgementSet, Duration acknowledgmentTimeout, DbTableMetadata dbTableMetadata) {
        this.dataFilePartition = dataFilePartition;
        this.bucket = dataFilePartition.getBucket();
        this.objectKey = dataFilePartition.getKey();
        this.objectReader = objectReader;
        this.codec = codec;
        this.buffer = buffer;
        this.recordConverter = recordConverter;
        this.sourceCoordinator = sourceCoordinator;
        this.acknowledgementSet = acknowledgementSet;
        this.acknowledgmentTimeout = acknowledgmentTimeout;
        this.dbTableMetadata = dbTableMetadata;
        this.exportRecordsTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT);
        this.exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT);
        this.exportRecordErrorCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSING_ERROR_COUNT);
        this.bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
        this.bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
    }

    public static DataFileLoader create(DataFilePartition dataFilePartition, InputCodec codec, Buffer<Record<Event>> buffer, S3ObjectReader objectReader, ExportRecordConverter recordConverter, PluginMetrics pluginMetrics, EnhancedSourceCoordinator sourceCoordinator, AcknowledgementSet acknowledgementSet, Duration acknowledgmentTimeout, DbTableMetadata dbTableMetadata) {
        return new DataFileLoader(dataFilePartition, codec, buffer, objectReader, recordConverter, pluginMetrics, sourceCoordinator, acknowledgementSet, acknowledgmentTimeout, dbTableMetadata);
    }

    @Override
    public void run() {
        LOG.info(DataPrepperMarkers.SENSITIVE, "Start loading s3://{}/{}", (Object)this.bucket, (Object)this.objectKey);
        BufferAccumulator bufferAccumulator = BufferAccumulator.create(this.buffer, (int)1000, (Duration)BUFFER_TIMEOUT);
        AtomicLong eventCount = new AtomicLong();
        try (InputStream inputStream = this.objectReader.readFile(this.bucket, this.objectKey);){
            this.codec.parse(inputStream, record -> {
                try {
                    this.exportRecordsTotalCounter.increment();
                    Event event = (Event)record.getData();
                    String string = event.toJsonString();
                    long bytes = string.getBytes().length;
                    this.bytesReceivedSummary.record((double)bytes);
                    DataFileProgressState progressState = this.dataFilePartition.getProgressState().get();
                    String fullTableName = progressState.getFullSourceTableName();
                    List<String> primaryKeys = progressState.getPrimaryKeyMap().getOrDefault(fullTableName, List.of());
                    this.transformEvent(event, fullTableName, EngineType.fromString(progressState.getEngineType()));
                    long snapshotTime = progressState.getSnapshotTime();
                    long eventVersionNumber = snapshotTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis();
                    Event transformedEvent = this.recordConverter.convert(event, progressState.getSourceDatabase(), progressState.getSourceSchema(), progressState.getSourceTable(), OpenSearchBulkActions.INDEX, primaryKeys, snapshotTime, eventVersionNumber, null);
                    if (this.acknowledgementSet != null) {
                        this.acknowledgementSet.add(transformedEvent);
                    }
                    bufferAccumulator.add(new Record((Object)transformedEvent));
                    eventCount.getAndIncrement();
                    this.bytesProcessedSummary.record((double)bytes);
                }
                catch (Exception e) {
                    LOG.atError().addMarker(DataPrepperMarkers.SENSITIVE).addMarker(DataPrepperMarkers.NOISY).setMessage("Failed to process record from object s3://{}/{}").addArgument((Object)this.bucket).addArgument((Object)this.objectKey).setCause((Throwable)e).log();
                    throw new RuntimeException(e);
                }
            });
            LOG.info(DataPrepperMarkers.SENSITIVE, "Completed loading object s3://{}/{} to buffer", (Object)this.bucket, (Object)this.objectKey);
        }
        catch (Exception e) {
            LOG.atError().addMarker(DataPrepperMarkers.SENSITIVE).addMarker(DataPrepperMarkers.NOISY).setMessage("Failed to load object s3://{}/{} to buffer").addArgument((Object)this.bucket).addArgument((Object)this.objectKey).setCause((Throwable)e).log();
            throw new RuntimeException(e);
        }
        try {
            bufferAccumulator.flush();
            if (this.acknowledgementSet != null) {
                this.sourceCoordinator.saveProgressStateForPartition((EnhancedSourcePartition)this.dataFilePartition, this.acknowledgmentTimeout);
                this.acknowledgementSet.complete();
            }
            this.exportRecordSuccessCounter.increment((double)eventCount.get());
        }
        catch (Exception e) {
            LOG.error(DataPrepperMarkers.NOISY, "Failed to write events to buffer", (Throwable)e);
            this.exportRecordErrorCounter.increment((double)eventCount.get());
        }
    }

    private void transformEvent(Event event, String fullTableName, EngineType engineType) {
        Object data;
        Map<String, String> columnDataTypeMap;
        if (engineType.isMySql()) {
            columnDataTypeMap = this.dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName);
            for (Map.Entry entry : event.toMap().entrySet()) {
                data = MySQLDataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataTypeMap.get(entry.getKey())), (String)entry.getKey(), entry.getValue(), null);
                event.put((String)entry.getKey(), data);
            }
        }
        if (engineType.isPostgres()) {
            columnDataTypeMap = this.dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName);
            for (Map.Entry entry : event.toMap().entrySet()) {
                data = PostgresDataTypeHelper.getDataByColumnType(PostgresDataType.byDataType(columnDataTypeMap.get(entry.getKey())), (String)entry.getKey(), entry.getValue());
                event.put((String)entry.getKey(), data);
            }
        }
    }
}

