/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.construct;

import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import org.apache.commons.lang.StringUtils;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.util.Preconditions;
import org.mule.runtime.core.DefaultEventContext;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.connector.ReplyToHandler;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.exception.MessagingExceptionHandler;
import org.mule.runtime.core.api.processor.MessageProcessorChainBuilder;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.config.i18n.CoreMessages;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.execution.ErrorHandlingExecutionTemplate;
import org.mule.runtime.core.interceptor.ProcessingTimeInterceptor;
import org.mule.runtime.core.internal.construct.AbstractPipeline;
import org.mule.runtime.core.internal.construct.processor.FlowConstructStatisticsMessageProcessor;
import org.mule.runtime.core.internal.util.rx.Operators;
import org.mule.runtime.core.management.stats.FlowConstructStatistics;
import org.mule.runtime.core.processor.strategy.DefaultFlowProcessingStrategyFactory;
import org.mule.runtime.core.processor.strategy.LegacySynchronousProcessingStrategyFactory;
import org.mule.runtime.core.routing.requestreply.AsyncReplyToPropertyRequestReplyReplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class DefaultFlowBuilder
implements Flow.Builder {
    private final String name;
    private final MuleContext muleContext;
    private MessageSource messageSource;
    private List<Processor> messageProcessors;
    private MessagingExceptionHandler exceptionListener;
    private ProcessingStrategyFactory processingStrategyFactory;
    private DefaultFlow flow;

    public DefaultFlowBuilder(String name, MuleContext muleContext) {
        Preconditions.checkArgument((boolean)StringUtils.isNotEmpty((String)name), (String)"name cannot be empty");
        Preconditions.checkArgument((muleContext != null ? 1 : 0) != 0, (String)"muleContext cannot be null");
        this.name = name;
        this.muleContext = muleContext;
    }

    @Override
    public Flow.Builder messageSource(MessageSource messageSource) {
        this.checkImmutable();
        Preconditions.checkArgument((messageSource != null ? 1 : 0) != 0, (String)"messageSource cannot be null");
        this.messageSource = messageSource;
        return this;
    }

    @Override
    public Flow.Builder messageProcessors(List<Processor> messageProcessors) {
        this.checkImmutable();
        Preconditions.checkArgument((messageProcessors != null ? 1 : 0) != 0, (String)"messageProcessors cannot be null");
        this.messageProcessors = messageProcessors;
        return this;
    }

    @Override
    public Flow.Builder messagingExceptionHandler(MessagingExceptionHandler exceptionListener) {
        this.checkImmutable();
        this.exceptionListener = exceptionListener;
        return this;
    }

    @Override
    public Flow.Builder processingStrategyFactory(ProcessingStrategyFactory processingStrategyFactory) {
        this.checkImmutable();
        Preconditions.checkArgument((processingStrategyFactory != null ? 1 : 0) != 0, (String)"processingStrategyFactory cannot be null");
        this.processingStrategyFactory = processingStrategyFactory;
        return this;
    }

    @Override
    public Flow build() {
        this.checkImmutable();
        this.flow = new DefaultFlow(this.name, this.muleContext);
        if (this.messageSource != null) {
            this.flow.setMessageSource(this.messageSource);
        }
        if (this.messageProcessors != null) {
            this.flow.setMessageProcessors(this.messageProcessors);
        }
        if (this.exceptionListener != null) {
            this.flow.setExceptionListener(this.exceptionListener);
        }
        if (this.processingStrategyFactory != null) {
            this.flow.setProcessingStrategyFactory(this.processingStrategyFactory);
        }
        return this.flow;
    }

    protected final void checkImmutable() {
        if (this.flow != null) {
            throw new IllegalStateException("Cannot change attributes once the flow was built");
        }
    }

    public static class DefaultFlow
    extends AbstractPipeline
    implements Flow {
        protected DefaultFlow(String name, MuleContext muleContext) {
            super(name, muleContext);
        }

        @Override
        public Event process(Event event) throws MuleException {
            if (this.useBlockingCodePath()) {
                return this.processBlockingSynchronous(event);
            }
            try {
                return (Event)Mono.just((Object)event).transform((Function)this).otherwise(Exceptions.EventDroppedException.class, ede -> Mono.empty()).block();
            }
            catch (Exception e) {
                throw Exceptions.rxExceptionToMuleException(e);
            }
        }

        private Event processBlockingSynchronous(Event event) throws MessagingException, DefaultMuleException {
            Event newEvent = this.createMuleEventForCurrentFlow(event, event.getReplyToDestination(), event.getReplyToHandler());
            try {
                ErrorHandlingExecutionTemplate executionTemplate = ErrorHandlingExecutionTemplate.createErrorHandlingExecutionTemplate(this.muleContext, this, this.getExceptionListener());
                Event result = executionTemplate.execute(() -> this.pipeline.process(newEvent));
                newEvent.getContext().success(result);
                this.streamingManager.success(result);
                return this.createReturnEventForParentFlowConstruct(result, event);
            }
            catch (MessagingException e) {
                e.setProcessedEvent(this.createReturnEventForParentFlowConstruct(e.getEvent(), event));
                newEvent.getContext().error((Throwable)((Object)e));
                this.streamingManager.error(newEvent);
                throw e;
            }
            catch (Exception e) {
                newEvent.getContext().error(e);
                this.streamingManager.error(newEvent);
                this.resetRequestContextEvent(event);
                throw new DefaultMuleException(CoreMessages.createStaticMessage((String)"Flow execution exception"), (Throwable)e);
            }
        }

        @Override
        public Publisher<Event> apply(Publisher<Event> publisher) {
            return Flux.from(publisher).doOnNext(this.assertStarted()).flatMap(event -> {
                if (this.processingStrategy == LegacySynchronousProcessingStrategyFactory.LEGACY_SYNCHRONOUS_PROCESSING_STRATEGY_INSTANCE) {
                    return Mono.just((Object)event).handle(Operators.nullSafeMap(Exceptions.checkedFunction(request -> this.processBlockingSynchronous((Event)request))));
                }
                Event request2 = this.createMuleEventForCurrentFlow((Event)event, event.getReplyToDestination(), event.getReplyToHandler());
                this.sink.accept(request2);
                return Mono.from((Publisher)request2.getContext()).map(r -> {
                    Event result = this.createReturnEventForParentFlowConstruct((Event)r, (Event)event);
                    this.streamingManager.success(result);
                    return result;
                }).mapError(MessagingException.class, me -> {
                    me.setProcessedEvent(this.createReturnEventForParentFlowConstruct(me.getEvent(), (Event)event));
                    this.streamingManager.error((Event)event);
                    return me;
                }).otherwiseIfEmpty(Mono.fromCallable(() -> {
                    throw Exceptions.newEventDroppedException(event);
                }));
            });
        }

        private Event createMuleEventForCurrentFlow(Event event, Object replyToDestination, ReplyToHandler replyToHandler) {
            replyToHandler = null;
            event = Event.builder(DefaultEventContext.child(event.getContext()), event).flow(this).replyToHandler(replyToHandler).replyToDestination(replyToDestination).build();
            this.resetRequestContextEvent(event);
            return event;
        }

        private Event createReturnEventForParentFlowConstruct(Event result, Event original) {
            if (result != null) {
                Optional<Error> errorOptional = result.getError();
                result = Event.builder(original.getContext(), result).flow(original.getFlowConstruct()).replyToHandler(original.getReplyToHandler()).replyToDestination(original.getReplyToDestination()).error(errorOptional.orElse(null)).build();
            }
            this.resetRequestContextEvent(result);
            return result;
        }

        private void resetRequestContextEvent(Event event) {
            Event.setCurrentEvent(event);
        }

        @Override
        protected void configurePreProcessors(MessageProcessorChainBuilder builder) throws MuleException {
            super.configurePreProcessors(builder);
            builder.chain(new ProcessingTimeInterceptor());
            builder.chain(new FlowConstructStatisticsMessageProcessor());
        }

        @Override
        protected void configurePostProcessors(MessageProcessorChainBuilder builder) throws MuleException {
            builder.chain(new AsyncReplyToPropertyRequestReplyReplier());
            super.configurePostProcessors(builder);
        }

        @Override
        protected ProcessingStrategyFactory createDefaultProcessingStrategyFactory() {
            return new DefaultFlowProcessingStrategyFactory();
        }

        @Override
        public String getConstructType() {
            return "Flow";
        }

        @Override
        protected void configureStatistics() {
            this.statistics = new FlowConstructStatistics(this.getConstructType(), this.name);
            this.statistics.setEnabled(this.muleContext.getStatistics().isEnabled());
            this.muleContext.getStatistics().add(this.statistics);
        }

        @Override
        public boolean isSynchronous() {
            return this.getProcessingStrategy() != null ? this.getProcessingStrategy().isSynchronous() : true;
        }
    }
}

