/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.messaging.eventhandling.processing.subscribing;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.messaging.core.Message;
import org.axonframework.messaging.core.MessageStream;
import org.axonframework.messaging.core.SubscribableEventSource;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;
import org.axonframework.messaging.core.unitofwork.UnitOfWork;
import org.axonframework.messaging.eventhandling.EventHandlingComponent;
import org.axonframework.messaging.eventhandling.EventMessage;
import org.axonframework.messaging.eventhandling.processing.EventProcessingException;
import org.axonframework.messaging.eventhandling.processing.EventProcessor;
import org.axonframework.messaging.eventhandling.processing.ProcessorEventHandlingComponents;
import org.axonframework.messaging.eventhandling.processing.errorhandling.ErrorContext;
import org.axonframework.messaging.eventhandling.processing.errorhandling.ErrorHandler;
import org.axonframework.messaging.eventhandling.processing.subscribing.SubscribingEventProcessorConfiguration;

public class SubscribingEventProcessor
implements EventProcessor {
    private final String name;
    private final SubscribingEventProcessorConfiguration configuration;
    private final SubscribableEventSource eventSource;
    private final ProcessorEventHandlingComponents eventHandlingComponents;
    private final ErrorHandler errorHandler;
    private volatile Registration eventBusRegistration;

    public SubscribingEventProcessor(@Nonnull String name, @Nonnull List<EventHandlingComponent> eventHandlingComponents, @Nonnull SubscribingEventProcessorConfiguration configuration) {
        this.name = Objects.requireNonNull(name, "Name may not be null");
        BuilderUtils.assertThat(name, n -> Objects.nonNull(n) && !n.isEmpty(), "Event Processor name may not be null or empty");
        Objects.requireNonNull(configuration, "SubscribingEventProcessorConfiguration may not be null");
        configuration.validate();
        this.configuration = configuration;
        this.eventSource = this.configuration.eventSource();
        this.eventHandlingComponents = new ProcessorEventHandlingComponents(eventHandlingComponents);
        this.errorHandler = this.configuration.errorHandler();
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public CompletableFuture<Void> start() {
        if (this.eventBusRegistration != null) {
            return FutureUtils.emptyCompletedFuture();
        }
        this.eventBusRegistration = this.eventSource.subscribe((events, context) -> {
            this.process(events.stream().map(it -> it).toList(), (ProcessingContext)context);
            return CompletableFuture.completedFuture(null);
        });
        return FutureUtils.emptyCompletedFuture();
    }

    @Override
    public boolean isRunning() {
        return this.eventBusRegistration != null;
    }

    @Override
    public boolean isError() {
        return false;
    }

    protected void process(@Nonnull List<EventMessage> eventMessages, @Nullable ProcessingContext context) {
        try {
            if (context != null) {
                FutureUtils.joinAndUnwrap(this.processWithErrorHandling(eventMessages, context).asCompletableFuture());
            } else {
                UnitOfWork unitOfWork = this.configuration.unitOfWorkFactory().create();
                unitOfWork.onInvocation(processingContext -> this.processWithErrorHandling(eventMessages, (ProcessingContext)processingContext).asCompletableFuture());
                FutureUtils.joinAndUnwrap(unitOfWork.execute());
            }
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (Exception e) {
            throw new EventProcessingException("Exception occurred while processing events", e);
        }
    }

    private MessageStream.Empty<Message> processWithErrorHandling(List<EventMessage> events, ProcessingContext context) {
        return this.eventHandlingComponents.handle(events, context).onErrorContinue(ex -> {
            try {
                this.errorHandler.handleError(new ErrorContext(this.name, (Throwable)ex, (List<? extends EventMessage>)events, context));
            }
            catch (RuntimeException re) {
                return MessageStream.failed(re);
            }
            catch (Exception e) {
                return MessageStream.failed(new EventProcessingException("Exception occurred while processing events", e));
            }
            return MessageStream.empty().cast();
        }).ignoreEntries().cast();
    }

    @Override
    public CompletableFuture<Void> shutdown() {
        if (this.eventBusRegistration != null) {
            this.eventBusRegistration.cancel();
        }
        this.eventBusRegistration = null;
        return FutureUtils.emptyCompletedFuture();
    }

    @Override
    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeProperty("name", this.name);
        descriptor.describeProperty("mode", "subscribing");
        descriptor.describeProperty("eventHandlingComponents", this.eventHandlingComponents);
        descriptor.describeProperty("configuration", this.configuration);
    }
}

