/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.notification.AbstractNotificationConsumer;
import org.apache.atlas.notification.AtlasNotificationMessageDeserializer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.commons.collections.MapUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AtlasKafkaConsumer<T>
extends AbstractNotificationConsumer<T> {
    private static final Logger LOG = LoggerFactory.getLogger(AtlasKafkaConsumer.class);
    private final KafkaConsumer kafkaConsumer;
    private final boolean autoCommitEnabled;
    private long pollTimeoutMilliSeconds = 1000L;

    public AtlasKafkaConsumer(NotificationInterface.NotificationType notificationType, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) {
        this(notificationType.getDeserializer(), kafkaConsumer, autoCommitEnabled, pollTimeoutMilliSeconds);
    }

    public AtlasKafkaConsumer(AtlasNotificationMessageDeserializer<T> deserializer, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) {
        super(deserializer);
        this.autoCommitEnabled = autoCommitEnabled;
        this.kafkaConsumer = kafkaConsumer;
        this.pollTimeoutMilliSeconds = pollTimeoutMilliSeconds;
    }

    @Override
    public List<AtlasKafkaMessage<T>> receive() {
        return this.receive(this.pollTimeoutMilliSeconds);
    }

    @Override
    public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
        return this.receive(this.pollTimeoutMilliSeconds, null);
    }

    @Override
    public List<AtlasKafkaMessage<T>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) {
        return this.receive(this.pollTimeoutMilliSeconds, lastCommittedPartitionOffset);
    }

    @Override
    public void commit(TopicPartition partition, long offset) {
        if (!this.autoCommitEnabled) {
            if (LOG.isDebugEnabled()) {
                LOG.info(" commiting the offset ==>> " + offset);
            }
            this.kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
        }
    }

    @Override
    public void close() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.close();
        }
    }

    @Override
    public void wakeup() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.wakeup();
        }
    }

    private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset) {
        ConsumerRecords records;
        ArrayList<AtlasKafkaMessage<T>> messages = new ArrayList<AtlasKafkaMessage<T>>();
        ConsumerRecords consumerRecords = records = this.kafkaConsumer != null ? this.kafkaConsumer.poll(timeoutMilliSeconds) : null;
        if (records != null) {
            for (ConsumerRecord record : records) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Received Message topic ={}, partition ={}, offset = {}, key = {}, value = {}", new Object[]{record.topic(), record.partition(), record.offset(), record.key(), record.value()});
                }
                TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
                if (MapUtils.isNotEmpty(lastCommittedPartitionOffset) && lastCommittedPartitionOffset.containsKey(topicPartition) && record.offset() < lastCommittedPartitionOffset.get(topicPartition)) {
                    this.commit(topicPartition, record.offset());
                    LOG.info("Skipping already processed message: topic={}, partition={} offset={}. Last processed offset={}", new Object[]{record.topic(), record.partition(), record.offset(), lastCommittedPartitionOffset.get(topicPartition)});
                    continue;
                }
                Object message = null;
                try {
                    message = this.deserializer.deserialize(record.value().toString());
                }
                catch (OutOfMemoryError excp) {
                    LOG.error("Ignoring message that failed to deserialize: topic={}, partition={}, offset={}, key={}, value={}", new Object[]{record.topic(), record.partition(), record.offset(), record.key(), record.value(), excp});
                }
                if (message == null) continue;
                messages.add(new AtlasKafkaMessage<Object>(message, record.offset(), record.topic(), record.partition(), this.deserializer.getMsgCreated(), this.deserializer.getSpooled()));
            }
        }
        return messages;
    }
}

