/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.extension.spring.messaging;

import jakarta.annotation.Nonnull;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiFunction;
import org.axonframework.common.Registration;
import org.axonframework.extension.spring.messaging.DefaultEventMessageConverter;
import org.axonframework.extension.spring.messaging.EventMessageConverter;
import org.axonframework.messaging.core.SubscribableEventSource;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;
import org.axonframework.messaging.eventhandling.EventBus;
import org.axonframework.messaging.eventhandling.EventMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;

public class InboundEventMessageChannelAdapter
implements MessageHandler,
SubscribableEventSource {
    private final CopyOnWriteArrayList<BiFunction<List<? extends EventMessage>, ProcessingContext, CompletableFuture<?>>> messageProcessors = new CopyOnWriteArrayList();
    private final EventMessageConverter eventMessageConverter;

    public InboundEventMessageChannelAdapter() {
        this(Collections.emptyList(), new DefaultEventMessageConverter());
    }

    public InboundEventMessageChannelAdapter(EventBus eventBus) {
        this(Collections.singletonList((events, context) -> {
            eventBus.publish(context, events.stream().map(it -> it).toList());
            return CompletableFuture.completedFuture(null);
        }), new DefaultEventMessageConverter());
    }

    public InboundEventMessageChannelAdapter(List<BiFunction<List<? extends EventMessage>, ProcessingContext, CompletableFuture<?>>> processors, EventMessageConverter eventMessageConverter) {
        this.messageProcessors.addAll(processors);
        this.eventMessageConverter = eventMessageConverter;
    }

    @Nonnull
    public Registration subscribe(@Nonnull BiFunction<List<? extends EventMessage>, ProcessingContext, CompletableFuture<?>> eventsBatchConsumer) {
        this.messageProcessors.add(eventsBatchConsumer);
        return () -> this.messageProcessors.remove(eventsBatchConsumer);
    }

    public void handleMessage(@Nonnull Message message) {
        List<EventMessage> messages = Collections.singletonList(this.transformMessage(message));
        for (BiFunction<List<EventMessage>, ProcessingContext, CompletableFuture<?>> messageProcessor : this.messageProcessors) {
            messageProcessor.apply(messages, null);
        }
    }

    protected EventMessage transformMessage(Message message) {
        return this.eventMessageConverter.convertFromInboundMessage(message);
    }
}

