package org.kie.kogito.addon.cloudevents.spring;

import jakarta.annotation.PostConstruct;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.kie.kogito.config.ConfigBean;
import org.kie.kogito.event.CloudEventUnmarshallerFactory;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventReceiver;
import org.kie.kogito.event.EventUnmarshaller;
import org.kie.kogito.event.Subscription;
import org.kie.kogito.event.impl.CloudEventConverter;
import org.kie.kogito.event.impl.DataEventConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/kie/kogito/addon/cloudevents/spring/SpringKafkaCloudEventReceiver.class */
public class SpringKafkaCloudEventReceiver implements EventReceiver {
    private static final Logger log = LoggerFactory.getLogger(SpringKafkaCloudEventReceiver.class);
    private Collection<Subscription<Object, String>> consumers;

    @Autowired
    EventUnmarshaller<Object> eventDataUnmarshaller;

    @Autowired
    CloudEventUnmarshallerFactory<Object> cloudEventUnmarshaller;

    @Autowired
    ConfigBean configBean;

    @PostConstruct
    private void init() {
        this.consumers = new CopyOnWriteArrayList();
    }

    public <T> void subscribe(Function<DataEvent<T>, CompletionStage<?>> function, Class<T> cls) {
        this.consumers.add(new Subscription<>(function, this.configBean.useCloudEvents() ? new CloudEventConverter(cls, this.cloudEventUnmarshaller) : new DataEventConverter(cls, this.eventDataUnmarshaller)));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v18, types: [java.util.concurrent.CompletionStage] */
    @KafkaListener(topics = {"#{springTopics.getIncomingTopics}"})
    public void receive(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) throws InterruptedException {
        log.debug("Receive message with key {} for topic {}", consumerRecord.key(), consumerRecord.topic());
        CompletableFuture completedFuture = CompletableFuture.completedFuture(null);
        for (Subscription<Object, String> subscription : this.consumers) {
            try {
                Object convert = subscription.getConverter().convert((String) consumerRecord.value());
                completedFuture = completedFuture.thenCompose(obj -> {
                    return (CompletionStage) subscription.getConsumer().apply(convert);
                });
            } catch (IOException e) {
                log.debug("Error converting event. Exception message is {}", e.getMessage());
            }
        }
        completedFuture.whenComplete((obj2, th) -> {
            acknowledge(th, acknowledgment);
        });
    }

    private void acknowledge(Throwable th, Acknowledgment acknowledgment) {
        if (th != null) {
            log.error("Event publishing failed", th);
        } else {
            log.debug("Acknoledge message");
            acknowledgment.acknowledge();
        }
    }
}
