/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling;

import java.util.List;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.io.IOUtils;
import org.axonframework.eventhandling.AbstractEventProcessor;
import org.axonframework.eventhandling.DirectEventProcessingStrategy;
import org.axonframework.eventhandling.ErrorHandler;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessingException;
import org.axonframework.eventhandling.EventProcessingStrategy;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.unitofwork.BatchingUnitOfWork;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.monitoring.MessageMonitor;

public class SubscribingEventProcessor
extends AbstractEventProcessor {
    private final SubscribableMessageSource<? extends EventMessage<?>> messageSource;
    private final EventProcessingStrategy processingStrategy;
    private volatile Registration eventBusRegistration;

    protected SubscribingEventProcessor(Builder builder) {
        super(builder);
        this.messageSource = builder.messageSource;
        this.processingStrategy = builder.processingStrategy;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    public void start() {
        if (this.eventBusRegistration == null) {
            this.eventBusRegistration = this.messageSource.subscribe(eventMessages -> this.processingStrategy.handle((List<? extends EventMessage<?>>)eventMessages, this::process));
        }
    }

    protected void process(List<? extends EventMessage<?>> eventMessages) {
        try {
            this.processInUnitOfWork(eventMessages, new BatchingUnitOfWork(eventMessages));
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (Exception e) {
            throw new EventProcessingException("Exception occurred while processing events", e);
        }
    }

    @Override
    public void shutDown() {
        IOUtils.closeQuietly(this.eventBusRegistration);
        this.eventBusRegistration = null;
    }

    public static class Builder
    extends AbstractEventProcessor.Builder {
        private SubscribableMessageSource<? extends EventMessage<?>> messageSource;
        private EventProcessingStrategy processingStrategy = DirectEventProcessingStrategy.INSTANCE;

        public Builder() {
            super.rollbackConfiguration(RollbackConfigurationType.ANY_THROWABLE);
        }

        @Override
        public Builder name(String name) {
            super.name(name);
            return this;
        }

        @Override
        public Builder eventHandlerInvoker(EventHandlerInvoker eventHandlerInvoker) {
            super.eventHandlerInvoker(eventHandlerInvoker);
            return this;
        }

        @Override
        public Builder rollbackConfiguration(RollbackConfiguration rollbackConfiguration) {
            super.rollbackConfiguration(rollbackConfiguration);
            return this;
        }

        @Override
        public Builder errorHandler(ErrorHandler errorHandler) {
            super.errorHandler(errorHandler);
            return this;
        }

        @Override
        public Builder messageMonitor(MessageMonitor<? super EventMessage<?>> messageMonitor) {
            super.messageMonitor(messageMonitor);
            return this;
        }

        public Builder messageSource(SubscribableMessageSource<? extends EventMessage<?>> messageSource) {
            BuilderUtils.assertNonNull(messageSource, "SubscribableMessageSource may not be null");
            this.messageSource = messageSource;
            return this;
        }

        public Builder processingStrategy(EventProcessingStrategy processingStrategy) {
            BuilderUtils.assertNonNull(processingStrategy, "EventProcessingStrategy may not be null");
            this.processingStrategy = processingStrategy;
            return this;
        }

        public SubscribingEventProcessor build() {
            return new SubscribingEventProcessor(this);
        }

        @Override
        protected void validate() throws AxonConfigurationException {
            super.validate();
            BuilderUtils.assertNonNull(this.messageSource, "The SubscribableMessageSource is a hard requirement and should be provided");
        }
    }
}

