/*
 * Decompiled with CFR 0.152.
 */
package com.chutneytesting.task.kafka;

import com.chutneytesting.task.amqp.utils.JsonPathEvaluator;
import com.chutneytesting.task.function.XPathFunction;
import com.chutneytesting.task.kafka.KafkaConsumerFactoryFactory;
import com.chutneytesting.task.spi.Task;
import com.chutneytesting.task.spi.TaskExecutionResult;
import com.chutneytesting.task.spi.injectable.Input;
import com.chutneytesting.task.spi.injectable.Logger;
import com.chutneytesting.task.spi.injectable.Target;
import com.chutneytesting.task.spi.time.Duration;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;

public class KafkaBasicConsumeTask
implements Task {
    static final String OUTPUT_BODY = "body";
    static final String OUTPUT_BODY_HEADERS_KEY = "headers";
    static final String OUTPUT_BODY_PAYLOAD_KEY = "payload";
    static final String OUTPUT_HEADERS = "headers";
    static final String OUTPUT_PAYLOADS = "payloads";
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final String topic;
    private final Logger logger;
    private final Integer nbMessages;
    private MimeType contentType;
    private final String timeout;
    private final String selector;
    private final String headerSelector;
    private final ConsumerFactory<String, String> consumerFactory;
    private final CountDownLatch countDownLatch;
    private final List<Map<String, Object>> consumedMessages = new ArrayList<Map<String, Object>>();
    private final ConcurrentMessageListenerContainer<String, String> messageListenerContainer;

    public KafkaBasicConsumeTask(Target target, @Input(value="topic") String topic, @Input(value="group") String group, @Input(value="properties") Map<String, String> properties, @Input(value="nb-messages") Integer nbMessages, @Input(value="selector") String selector, @Input(value="header-selector") String headerSelector, @Input(value="content-type") String contentType, @Input(value="timeout") String timeout, Logger logger) {
        this.topic = topic;
        this.nbMessages = (Integer)ObjectUtils.defaultIfNull((Object)nbMessages, (Object)1);
        this.selector = selector;
        this.headerSelector = headerSelector;
        this.contentType = Optional.ofNullable(contentType).map(ct -> (String)StringUtils.defaultIfEmpty((CharSequence)ct, (CharSequence)"application/json")).map(MimeTypeUtils::parseMimeType).orElse(MimeTypeUtils.APPLICATION_JSON);
        this.timeout = (String)StringUtils.defaultIfEmpty((CharSequence)timeout, (CharSequence)"60 sec");
        this.consumerFactory = new KafkaConsumerFactoryFactory().create(target, group, (Map)ObjectUtils.defaultIfNull(properties, Collections.emptyMap()));
        this.countDownLatch = new CountDownLatch(this.nbMessages);
        this.messageListenerContainer = this.createMessageListenerContainer(this.createMessageListener());
        this.logger = logger;
    }

    public TaskExecutionResult execute() {
        try {
            this.logger.info("Consuming message from topic " + this.topic);
            this.messageListenerContainer.start();
            this.countDownLatch.await(Duration.parse((String)this.timeout).toMilliseconds(), TimeUnit.MILLISECONDS);
            if (this.consumedMessages.size() != this.nbMessages.intValue()) {
                this.logger.error("Unable to get the expected number of messages [" + this.nbMessages + "] during " + this.timeout + " from topic " + this.topic + ".");
                TaskExecutionResult taskExecutionResult = TaskExecutionResult.ko();
                return taskExecutionResult;
            }
            this.logger.info("Consumed [" + this.nbMessages + "] Kafka Messages from topic " + this.topic);
            TaskExecutionResult taskExecutionResult = TaskExecutionResult.ok(this.toOutputs());
            return taskExecutionResult;
        }
        catch (Exception e) {
            this.logger.error("An exception occurs when consuming a message to Kafka server: " + e.getMessage());
            TaskExecutionResult taskExecutionResult = TaskExecutionResult.ko();
            return taskExecutionResult;
        }
        finally {
            this.messageListenerContainer.stop();
        }
    }

    private MessageListener<String, String> createMessageListener() {
        return record -> {
            if (this.countDownLatch.getCount() <= 0L) {
                return;
            }
            Map<String, Object> message = this.extractMessageFromRecord((ConsumerRecord<String, String>)record);
            if (this.applySelector(message) && this.applyHeaderSelector(message)) {
                this.addMessageToResultAndCountDown(message);
            }
        };
    }

    private boolean applySelector(Map<String, Object> message) {
        if (StringUtils.isBlank((CharSequence)this.selector)) {
            return true;
        }
        if (this.contentType.getSubtype().contains(MimeTypeUtils.APPLICATION_JSON.getSubtype())) {
            try {
                String messageAsString = OBJECT_MAPPER.writeValueAsString(message);
                return JsonPathEvaluator.evaluate(messageAsString, this.selector);
            }
            catch (Exception e) {
                this.logger.info("Received a message, however cannot read process it as json, ignoring payload selection : " + e.getMessage());
                return true;
            }
        }
        if (this.contentType.getSubtype().contains(MimeTypeUtils.APPLICATION_XML.getSubtype())) {
            try {
                Object result = XPathFunction.xpath((String)message.get(OUTPUT_BODY_PAYLOAD_KEY), this.selector);
                return Optional.ofNullable(result).isPresent();
            }
            catch (Exception e) {
                this.logger.info("Received a message, however cannot read process it as xml, ignoring payload selection : " + e.getMessage());
                return true;
            }
        }
        this.logger.info("Applying selector as text");
        return ((String)message.get(OUTPUT_BODY_PAYLOAD_KEY)).contains(this.selector);
    }

    private boolean applyHeaderSelector(Map<String, Object> message) {
        if (StringUtils.isBlank((CharSequence)this.headerSelector)) {
            return true;
        }
        try {
            String messageAsString = OBJECT_MAPPER.writeValueAsString(message.get("headers"));
            return JsonPathEvaluator.evaluate(messageAsString, this.headerSelector);
        }
        catch (Exception e) {
            this.logger.error("\"Received a message, however cannot process headers selection, Ignoring header selection");
            return true;
        }
    }

    private void addMessageToResultAndCountDown(Map<String, Object> message) {
        this.consumedMessages.add(message);
        this.countDownLatch.countDown();
    }

    private Object extractPayload(ConsumerRecord<String, String> record) {
        if (this.contentType.getSubtype().contains(MimeTypeUtils.APPLICATION_JSON.getSubtype())) {
            try {
                return OBJECT_MAPPER.readValue((String)record.value(), Map.class);
            }
            catch (IOException e) {
                this.logger.info("Received a message, however cannot read it as Json fallback as String.");
            }
        }
        return record.value();
    }

    private Map<String, Object> extractMessageFromRecord(ConsumerRecord<String, String> record) {
        HashMap<String, Object> message = new HashMap<String, Object>();
        Map<String, Object> headers = this.extractHeaders(record);
        this.checkContentTypeHeader(headers);
        Object payload = this.extractPayload(record);
        message.put("headers", headers);
        message.put(OUTPUT_BODY_PAYLOAD_KEY, payload);
        return message;
    }

    private Map<String, Object> extractHeaders(ConsumerRecord<String, String> record) {
        return Stream.of(record.headers().toArray()).collect(Collectors.toMap(Header::key, header -> new String(header.value(), StandardCharsets.UTF_8)));
    }

    private ConcurrentMessageListenerContainer<String, String> createMessageListenerContainer(MessageListener<String, String> messageListener) {
        ContainerProperties containerProperties = new ContainerProperties(new String[]{this.topic});
        containerProperties.setMessageListener(messageListener);
        return new ConcurrentMessageListenerContainer(this.consumerFactory, containerProperties);
    }

    private Map<String, Object> toOutputs() {
        HashMap<String, Object> results = new HashMap<String, Object>();
        results.put(OUTPUT_BODY, this.consumedMessages);
        results.put(OUTPUT_PAYLOADS, this.consumedMessages.stream().map(e -> e.get(OUTPUT_BODY_PAYLOAD_KEY)).collect(Collectors.toList()));
        results.put("headers", this.consumedMessages.stream().map(e -> e.get("headers")).collect(Collectors.toList()));
        return results;
    }

    private void checkContentTypeHeader(Map<String, Object> headers) {
        try {
            Optional<MimeType> contentType = headers.entrySet().stream().filter(e -> ((String)e.getKey()).replaceAll("[- ]", "").equalsIgnoreCase("contenttype")).findAny().map(Map.Entry::getValue).map(Object::toString).map(s -> s.replace("\"", "")).map(MimeTypeUtils::parseMimeType);
            contentType.ifPresent(ct -> {
                this.logger.info("Found content type header " + ct);
                this.contentType = ct;
            });
        }
        catch (Exception e2) {
            this.logger.error("Cannot retrieve content type from message received:  " + e2.getMessage());
        }
    }
}

