/*
 * Decompiled with CFR 0.152.
 */
package com.icthh.xm.commons.topic.config;

import com.icthh.xm.commons.logging.util.MdcUtils;
import com.icthh.xm.commons.topic.domain.TopicConfig;
import com.icthh.xm.commons.topic.util.MessageRetryDetailsUtils;
import java.nio.charset.StandardCharsets;
import java.util.StringJoiner;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryContext;

public class ConsumerRecoveryCallback
implements RecoveryCallback<Object> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerRecoveryCallback.class);
    private final String tenantKey;
    private final TopicConfig topicConfig;
    private final KafkaTemplate<String, String> kafkaTemplate;

    public ConsumerRecoveryCallback(String tenantKey, TopicConfig topicConfig, KafkaTemplate<String, String> kafkaTemplate) {
        this.tenantKey = tenantKey;
        this.topicConfig = topicConfig;
        this.kafkaTemplate = kafkaTemplate;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Object recover(RetryContext context) {
        ConsumerRecord record = (ConsumerRecord)context.getAttribute("record");
        if (record == null) {
            log.warn("Message skipped. Message record is null for context: {}", (Object)context);
            return null;
        }
        String rawBody = String.valueOf(record.value());
        String deadLetterQueue = this.topicConfig.getDeadLetterQueue();
        try {
            this.putRid(record);
            if (StringUtils.isEmpty((CharSequence)deadLetterQueue)) {
                log.info("Message skipped. Processing failed for tenant: [{}], body = {}", (Object)this.tenantKey, (Object)rawBody);
                this.acknowledge(rawBody, context);
                Object var5_5 = null;
                return var5_5;
            }
            Headers headers = ConsumerRecoveryCallback.addExceptionHeaders(context, record);
            ProducerRecord dlqRecord = new ProducerRecord(deadLetterQueue, Integer.valueOf(record.partition()), (Object)(record.key() != null ? record.key().toString() : null), (Object)rawBody, (Iterable)headers);
            this.kafkaTemplate.send(dlqRecord);
            this.acknowledge(rawBody, context);
            log.warn("send message to dead-letter [{}] due to retry count exceeded [{}], total processing time = {} ms, body = [{}]", new Object[]{deadLetterQueue, MessageRetryDetailsUtils.getRetryCounter(record), MessageRetryDetailsUtils.getTotalProcessingTime(record), rawBody});
        }
        finally {
            MdcUtils.clear();
        }
        return null;
    }

    private static Headers addExceptionHeaders(RetryContext context, ConsumerRecord<?, ?> record) {
        RecordHeaders headers = new RecordHeaders((Iterable)record.headers());
        Throwable e = context.getLastThrowable();
        if (e instanceof ListenerExecutionFailedException && e.getCause() != null) {
            e = e.getCause();
        }
        headers.add((Header)new RecordHeader("xm_exceptionMessage", e.toString().getBytes(StandardCharsets.UTF_8)));
        headers.add((Header)new RecordHeader("xm_exceptionStackTrace", ExceptionUtils.getStackTrace((Throwable)e).getBytes(StandardCharsets.UTF_8)));
        return headers;
    }

    private void acknowledge(String rawBody, RetryContext context) {
        Acknowledgment acknowledgment = (Acknowledgment)context.getAttribute("acknowledgment");
        if (acknowledgment == null) {
            log.warn("Acknowledge failed for message: [{}], tenant: [{}]", (Object)rawBody, (Object)this.tenantKey);
            return;
        }
        acknowledgment.acknowledge();
    }

    private void putRid(ConsumerRecord<?, ?> record) {
        MdcUtils.putRid((String)new StringJoiner(":").add(this.tenantKey).add(this.topicConfig.getTopicName()).add(MessageRetryDetailsUtils.getRid(record)).toString());
    }
}

