/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.kafka.producer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.fge.jsonschema.core.exceptions.ProcessingException;
import com.github.fge.jsonschema.core.report.ProcessingReport;
import com.github.fge.jsonschema.main.JsonSchema;
import com.github.fge.jsonschema.main.JsonSchemaFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
import org.opensearch.dataprepper.plugins.kafka.service.SchemaService;
import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicProducerMetrics;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaCustomProducer<T> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaCustomProducer.class);
    private final KafkaProducer<String, T> producer;
    private final KafkaProducerConfig kafkaProducerConfig;
    private final DLQSink dlqSink;
    private final Collection<EventHandle> bufferedEventHandles;
    private final ExpressionEvaluator expressionEvaluator;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final String tagTargetKey;
    private final String topicName;
    private final String serdeFormat;
    private final SchemaService schemaService;
    private final KafkaTopicProducerMetrics topicMetrics;
    private final CompressionOption compressionConfig;

    public KafkaCustomProducer(KafkaProducer producer, KafkaProducerConfig kafkaProducerConfig, DLQSink dlqSink, ExpressionEvaluator expressionEvaluator, String tagTargetKey, KafkaTopicProducerMetrics topicMetrics, SchemaService schemaService, CompressionOption compressionConfig) {
        this.producer = producer;
        this.kafkaProducerConfig = kafkaProducerConfig;
        this.dlqSink = dlqSink;
        this.bufferedEventHandles = new LinkedList<EventHandle>();
        this.expressionEvaluator = expressionEvaluator;
        this.tagTargetKey = tagTargetKey;
        this.topicName = ObjectUtils.isEmpty((Object)kafkaProducerConfig.getTopic()) ? null : kafkaProducerConfig.getTopic().getName();
        this.serdeFormat = ObjectUtils.isEmpty((Object)kafkaProducerConfig.getSerdeFormat()) ? null : kafkaProducerConfig.getSerdeFormat();
        this.schemaService = schemaService;
        this.topicMetrics = topicMetrics;
        this.topicMetrics.register(this.producer);
        this.compressionConfig = compressionConfig == null ? CompressionOption.NONE : compressionConfig;
    }

    public KafkaCustomProducer(KafkaProducer producer, KafkaProducerConfig kafkaProducerConfig, DLQSink dlqSink, ExpressionEvaluator expressionEvaluator, String tagTargetKey, KafkaTopicProducerMetrics topicMetrics, SchemaService schemaService) {
        this(producer, kafkaProducerConfig, dlqSink, expressionEvaluator, tagTargetKey, topicMetrics, schemaService, null);
    }

    KafkaTopicProducerMetrics getTopicMetrics() {
        return this.topicMetrics;
    }

    public void produceRawData(byte[] bytes, String key) throws Exception {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            OutputStream compressedOutputStream = this.compressionConfig.getCompressionEngine().createOutputStream((OutputStream)byteArrayOutputStream);
            compressedOutputStream.write(bytes);
            compressedOutputStream.close();
            this.send(this.topicName, key, byteArrayOutputStream.toByteArray()).get();
            this.topicMetrics.update(this.producer);
        }
        catch (Exception e) {
            this.topicMetrics.getNumberOfRawDataSendErrors().increment();
            LOG.error("Error occurred while publishing raw data", (Throwable)e);
            throw e;
        }
    }

    public Integer getMaxRequestSize() {
        KafkaProducerProperties producerProperties = this.kafkaProducerConfig.getKafkaProducerProperties();
        if (producerProperties != null) {
            return producerProperties.getMaxRequestSize();
        }
        return 0x100000;
    }

    public void produceRecords(Record<Event> record) throws Exception {
        this.bufferedEventHandles.add(((Event)record.getData()).getEventHandle());
        Event event = this.getEvent(record);
        String key = event.formatString(this.kafkaProducerConfig.getPartitionKey(), this.expressionEvaluator);
        try {
            if (Objects.equals(this.serdeFormat, MessageFormat.JSON.toString())) {
                this.publishJsonMessage(record, key);
            } else if (Objects.equals(this.serdeFormat, MessageFormat.AVRO.toString())) {
                this.publishAvroMessage(record, key);
            } else if (Objects.equals(this.serdeFormat, MessageFormat.BYTES.toString())) {
                this.publishJsonMessageAsBytes(record, key);
            } else {
                this.publishPlaintextMessage(record, key);
            }
            this.topicMetrics.update(this.producer);
        }
        catch (Exception e) {
            LOG.error("Error occurred while publishing record {}", (Object)e.getMessage());
            this.topicMetrics.getNumberOfRecordSendErrors().increment();
            if (this.dlqSink != null) {
                JsonNode dataNode = ((Event)record.getData()).getJsonNode();
                this.dlqSink.perform(dataNode, e);
            }
            this.releaseEventHandles(false);
            throw e;
        }
    }

    private void publishJsonMessageAsBytes(Record<Event> record, String key) throws Exception {
        JsonNode dataNode = ((Event)record.getData()).getJsonNode();
        byte[] bytes = this.objectMapper.writeValueAsBytes((Object)dataNode);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        OutputStream compressedOutputStream = this.compressionConfig.getCompressionEngine().createOutputStream((OutputStream)byteArrayOutputStream);
        compressedOutputStream.write(bytes);
        compressedOutputStream.close();
        this.send(this.topicName, key, byteArrayOutputStream.toByteArray());
    }

    private Event getEvent(Record<Event> record) {
        Event event = (Event)record.getData();
        try {
            event = this.addTagsToEvent(event, this.tagTargetKey);
        }
        catch (JsonProcessingException e) {
            LOG.error("error occurred while processing tag target key");
        }
        return event;
    }

    private void publishPlaintextMessage(Record<Event> record, String key) throws Exception {
        this.send(this.topicName, key, ((Event)record.getData()).toJsonString());
    }

    private void publishAvroMessage(Record<Event> record, String key) throws Exception {
        Schema avroSchema = this.schemaService.getSchema(this.topicName);
        if (avroSchema == null) {
            throw new RuntimeException("Schema definition is mandatory in case of type avro");
        }
        GenericRecord genericRecord = this.getGenericRecord((Event)record.getData(), avroSchema);
        this.send(this.topicName, key, genericRecord);
    }

    Future send(String topicName, String key, Object record) throws Exception {
        ProducerRecord producerRecord = Objects.isNull(key) ? new ProducerRecord(topicName, record) : new ProducerRecord(topicName, (Object)key, record);
        return this.producer.send(producerRecord, this.callBack(record));
    }

    private void publishJsonMessage(Record<Event> record, String key) throws IOException, ProcessingException, Exception {
        JsonNode dataNode = ((Event)record.getData()).getJsonNode();
        this.send(this.topicName, key, dataNode);
    }

    public boolean validateSchema(String jsonData, String schemaJson) throws IOException, ProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
        JsonNode schemaNode = objectMapper.readTree(schemaJson);
        JsonNode dataNode = objectMapper.readTree(jsonData);
        JsonSchemaFactory schemaFactory = JsonSchemaFactory.byDefault();
        JsonSchema schema = schemaFactory.getJsonSchema(schemaNode);
        ProcessingReport report = schema.validate(dataNode);
        return report != null ? report.isSuccess() : false;
    }

    private Callback callBack(Object dataForDlq) {
        return (metadata, exception) -> {
            if (null != exception) {
                LOG.error("Error occurred while publishing {}", (Object)exception.getMessage());
                this.topicMetrics.getNumberOfRecordProcessingErrors().increment();
            } else {
                this.releaseEventHandles(true);
            }
        };
    }

    private GenericRecord getGenericRecord(Event event, Schema schema) {
        GenericData.Record record = new GenericData.Record(schema);
        for (String key : event.toMap().keySet()) {
            record.put(key, event.toMap().get(key));
        }
        return record;
    }

    private void releaseEventHandles(boolean result) {
        for (EventHandle eventHandle : this.bufferedEventHandles) {
            eventHandle.release(result);
        }
        this.bufferedEventHandles.clear();
    }

    private Event addTagsToEvent(Event event, String tagsTargetKey) throws JsonProcessingException {
        String eventJsonString = event.jsonBuilder().includeTags(tagsTargetKey).toJsonString();
        Map eventData = (Map)this.objectMapper.readValue(eventJsonString, (TypeReference)new TypeReference<Map<String, Object>>(){});
        return JacksonLog.builder().withData((Object)eventData).build();
    }
}

