/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.module.extension.internal.runtime.source;

import com.google.common.base.Predicate;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.inject.Inject;
import org.apache.commons.collections.CollectionUtils;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.meta.model.ExtensionModel;
import org.mule.runtime.api.meta.model.ModelProperty;
import org.mule.runtime.api.meta.model.source.SourceModel;
import org.mule.runtime.api.tx.TransactionException;
import org.mule.runtime.api.tx.TransactionType;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.InternalEvent;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.exception.MessagingException;
import org.mule.runtime.core.api.functional.Either;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.streaming.CursorProviderFactory;
import org.mule.runtime.core.api.streaming.StreamingManager;
import org.mule.runtime.core.api.util.MessagingExceptionResolver;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.exception.IllegalModelDefinitionException;
import org.mule.runtime.extension.api.runtime.config.ConfigurationInstance;
import org.mule.runtime.extension.api.runtime.connectivity.Reconnectable;
import org.mule.runtime.extension.api.runtime.connectivity.ReconnectionCallback;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.tx.SourceTransactionalAction;
import org.mule.runtime.extension.internal.property.TransactionalActionModelProperty;
import org.mule.runtime.extension.internal.property.TransactionalTypeModelProperty;
import org.mule.runtime.module.extension.api.util.MuleExtensionUtils;
import org.mule.runtime.module.extension.internal.loader.java.property.DeclaringMemberModelProperty;
import org.mule.runtime.module.extension.internal.loader.java.property.SourceCallbackModelProperty;
import org.mule.runtime.module.extension.internal.runtime.connectivity.ReactiveReconnectionCallback;
import org.mule.runtime.module.extension.internal.runtime.resolver.ResolverSet;
import org.mule.runtime.module.extension.internal.runtime.resolver.ResolverSetResult;
import org.mule.runtime.module.extension.internal.runtime.resolver.ValueResolver;
import org.mule.runtime.module.extension.internal.runtime.resolver.ValueResolvingContext;
import org.mule.runtime.module.extension.internal.runtime.source.NullSourceCallbackExecutor;
import org.mule.runtime.module.extension.internal.runtime.source.NullSourceCompletionHandler;
import org.mule.runtime.module.extension.internal.runtime.source.ReflectiveSourceCallbackExecutor;
import org.mule.runtime.module.extension.internal.runtime.source.SourceCallbackContextAdapter;
import org.mule.runtime.module.extension.internal.runtime.source.SourceCallbackExecutor;
import org.mule.runtime.module.extension.internal.runtime.source.SourceCallbackFactory;
import org.mule.runtime.module.extension.internal.runtime.source.SourceCompletionHandler;
import org.mule.runtime.module.extension.internal.runtime.source.SourceCompletionHandlerFactory;
import org.mule.runtime.module.extension.internal.runtime.source.SourceConnectionManager;
import org.mule.runtime.module.extension.internal.runtime.source.SourceConnectionProvider;
import org.mule.runtime.module.extension.internal.util.FieldSetter;
import org.mule.runtime.module.extension.internal.util.IntrospectionUtils;
import org.reactivestreams.Publisher;
import org.reflections.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public final class SourceAdapter
implements Startable,
Stoppable,
Initialisable {
    private static final Logger LOGGER = LoggerFactory.getLogger(SourceAdapter.class);
    private final ExtensionModel extensionModel;
    private final SourceModel sourceModel;
    private final Source source;
    private final Optional<ConfigurationInstance> configurationInstance;
    private final Optional<FieldSetter<Object, Object>> configurationSetter;
    private final Optional<FieldSetter<Object, ConnectionProvider>> connectionSetter;
    private final SourceCallbackFactory sourceCallbackFactory;
    private final CursorProviderFactory cursorProviderFactory;
    private final ResolverSet nonCallbackParameters;
    private final ResolverSet successCallbackParameters;
    private final ResolverSet errorCallbackParameters;
    private final ComponentLocation componentLocation;
    private final SourceConnectionManager connectionManager;
    private final MessagingExceptionResolver exceptionResolver;
    @Inject
    private StreamingManager streamingManager;
    @Inject
    private MuleContext muleContext;

    public SourceAdapter(ExtensionModel extensionModel, SourceModel sourceModel, Source source, Optional<ConfigurationInstance> configurationInstance, CursorProviderFactory cursorProviderFactory, SourceCallbackFactory sourceCallbackFactory, ComponentLocation componentLocation, SourceConnectionManager connectionManager, ResolverSet nonCallbackParameters, ResolverSet successCallbackParameters, ResolverSet errorCallbackParameters, MessagingExceptionResolver exceptionResolver) {
        this.extensionModel = extensionModel;
        this.sourceModel = sourceModel;
        this.source = source;
        this.cursorProviderFactory = cursorProviderFactory;
        this.configurationInstance = configurationInstance;
        this.sourceCallbackFactory = sourceCallbackFactory;
        this.componentLocation = componentLocation;
        this.connectionManager = connectionManager;
        this.nonCallbackParameters = nonCallbackParameters;
        this.successCallbackParameters = successCallbackParameters;
        this.errorCallbackParameters = errorCallbackParameters;
        this.exceptionResolver = exceptionResolver;
        this.configurationSetter = this.fetchConfigurationField();
        this.connectionSetter = this.fetchConnectionProviderField();
    }

    private SourceCallback createSourceCallback() {
        return this.sourceCallbackFactory.createSourceCallback(this.createCompletionHandlerFactory());
    }

    private SourceCompletionHandlerFactory createCompletionHandlerFactory() {
        return this.sourceModel.getModelProperty(SourceCallbackModelProperty.class).map(this::doCreateCompletionHandler).orElse(context -> new NullSourceCompletionHandler());
    }

    private SourceCompletionHandlerFactory doCreateCompletionHandler(SourceCallbackModelProperty modelProperty) {
        SourceCallbackExecutor onSuccessExecutor = this.getMethodExecutor(modelProperty.getOnSuccessMethod(), modelProperty);
        SourceCallbackExecutor onErrorExecutor = this.getMethodExecutor(modelProperty.getOnErrorMethod(), modelProperty);
        SourceCallbackExecutor onTerminateExecutor = this.getMethodExecutor(modelProperty.getOnTerminateMethod(), modelProperty);
        return context -> new DefaultSourceCompletionHandler(onSuccessExecutor, onErrorExecutor, onTerminateExecutor, context);
    }

    private SourceCallbackExecutor getMethodExecutor(Optional<Method> method, SourceCallbackModelProperty sourceCallbackModel) {
        return method.map(m -> new ReflectiveSourceCallbackExecutor(this.extensionModel, this.configurationInstance, this.sourceModel, this.source, (Method)m, this.cursorProviderFactory, this.streamingManager, this.componentLocation, this.muleContext, sourceCallbackModel)).orElse(new NullSourceCallbackExecutor());
    }

    public void initialise() throws InitialisationException {
        LifecycleUtils.initialiseIfNeeded((Object)this.nonCallbackParameters, (boolean)true, (MuleContext)this.muleContext);
        LifecycleUtils.initialiseIfNeeded((Object)this.errorCallbackParameters, (boolean)true, (MuleContext)this.muleContext);
        LifecycleUtils.initialiseIfNeeded((Object)this.successCallbackParameters, (boolean)true, (MuleContext)this.muleContext);
    }

    public void start() throws MuleException {
        this.injectComponentLocation();
        try {
            this.setConfiguration(this.configurationInstance);
            this.setConnection();
            this.muleContext.getInjector().inject((Object)this.source);
            this.source.onStart(this.createSourceCallback());
        }
        catch (Exception e) {
            throw new DefaultMuleException((Throwable)e);
        }
    }

    private void injectComponentLocation() {
        List<Field> fields = IntrospectionUtils.getFieldsOfType(this.source.getClass(), ComponentLocation.class);
        if (fields.isEmpty()) {
            return;
        }
        new FieldSetter<Source, ComponentLocation>(fields.get(0)).set(this.source, this.componentLocation);
    }

    public void stop() throws MuleException {
        try {
            this.source.onStop();
        }
        catch (Exception e) {
            throw new DefaultMuleException((Throwable)e);
        }
    }

    private void setConfiguration(Optional<ConfigurationInstance> configuration) {
        if (this.configurationSetter.isPresent() && configuration.isPresent()) {
            this.configurationSetter.get().set(this.source, configuration.get().getValue());
        }
    }

    private void setConnection() throws MuleException {
        if (!this.connectionSetter.isPresent()) {
            return;
        }
        FieldSetter<Object, ConnectionProvider> setter = this.connectionSetter.get();
        ConfigurationInstance config = this.configurationInstance.orElseThrow(() -> new DefaultMuleException(I18nMessageFactory.createStaticMessage((String)"Message Source on root component '%s' requires a connection but it doesn't point to any configuration. Please review your application", (Object[])new Object[]{this.componentLocation.getRootContainerName()})));
        if (!config.getConnectionProvider().isPresent()) {
            throw new DefaultMuleException(I18nMessageFactory.createStaticMessage((String)String.format("Message Source on root component '%s' requires a connection, but points to config '%s' which doesn't specify any. Please review your application", this.componentLocation.getRootContainerName(), config.getName())));
        }
        SourceConnectionProvider connectionProvider = new SourceConnectionProvider(this.connectionManager, config);
        setter.set(this.source, connectionProvider);
    }

    Optional<ConfigurationInstance> getConfigurationInstance() {
        return this.configurationInstance;
    }

    private <T> Optional<FieldSetter<Object, T>> fetchConfigurationField() {
        return this.fetchField(Config.class).map(FieldSetter::new);
    }

    private <T> Optional<FieldSetter<Object, T>> fetchConnectionProviderField() {
        return this.fetchField(Connection.class).map(field -> {
            if (!ConnectionProvider.class.equals(field.getType())) {
                throw new IllegalModelDefinitionException(String.format("Message Source defined on class '%s' has field '%s' of type '%s' annotated with @%s. That annotation can only be used on fields of type '%s'", this.source.getClass().getName(), field.getName(), field.getType().getName(), Connection.class.getName(), ConnectionProvider.class.getName()));
            }
            return new FieldSetter((Field)field);
        });
    }

    private Optional<Field> fetchField(Class<? extends Annotation> annotation) {
        Set fields = ReflectionUtils.getAllFields(this.source.getClass(), (Predicate[])new Predicate[]{ReflectionUtils.withAnnotation(annotation)});
        if (CollectionUtils.isEmpty((Collection)fields)) {
            return Optional.empty();
        }
        if (fields.size() > 1) {
            throw new IllegalModelDefinitionException(String.format("Message Source defined on class '%s' has more than one field annotated with '@%s'. Only one field in the class can bare such annotation", this.source.getClass().getName(), annotation.getSimpleName()));
        }
        return Optional.of(fields.iterator().next());
    }

    public String getName() {
        return IntrospectionUtils.getSourceName(this.source.getClass());
    }

    public Source getDelegate() {
        return this.source;
    }

    Optional<Publisher<Void>> getReconnectionAction(ConnectionException e) {
        if (this.source instanceof Reconnectable) {
            return Optional.of(Mono.create(sink -> ((Reconnectable)this.source).reconnect(e, (ReconnectionCallback)new ReactiveReconnectionCallback((MonoSink<Void>)sink))));
        }
        return Optional.empty();
    }

    public SourceTransactionalAction getTransactionalAction() {
        return this.getNonCallbackParameterValue(this.getTransactionalActionFieldName(), SourceTransactionalAction.class).orElse(SourceTransactionalAction.NONE);
    }

    TransactionType getTransactionalType() {
        return this.getNonCallbackParameterValue(this.getTransactionTypeFieldName(), TransactionType.class).orElse(TransactionType.LOCAL);
    }

    private <T> Optional<T> getNonCallbackParameterValue(String fieldName, Class<T> type) {
        Object object;
        ValueResolver<?> valueResolver = this.nonCallbackParameters.getResolvers().get(fieldName);
        if (valueResolver == null) {
            return Optional.empty();
        }
        try {
            object = valueResolver.resolve(ValueResolvingContext.from(MuleExtensionUtils.getInitialiserEvent(this.muleContext)));
        }
        catch (MuleException e) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)("Unable to get the " + type.getSimpleName() + " value for Message Source")), (Throwable)e);
        }
        if (!type.isInstance(object)) {
            throw new IllegalStateException("The resolved value is not a " + type.getSimpleName());
        }
        return Optional.of(object);
    }

    private String getTransactionalActionFieldName() {
        return this.getFieldNameEnrichedWith(TransactionalActionModelProperty.class, "transactionalAction");
    }

    private String getTransactionTypeFieldName() {
        return this.getFieldNameEnrichedWith(TransactionalTypeModelProperty.class, "transactionType");
    }

    private String getFieldNameEnrichedWith(Class<? extends ModelProperty> type, String defaultName) {
        return this.sourceModel.getAllParameterModels().stream().filter(param -> param.getModelProperty(type).isPresent()).filter(param -> param.getModelProperty(DeclaringMemberModelProperty.class).isPresent()).map(param -> (DeclaringMemberModelProperty)param.getModelProperty(DeclaringMemberModelProperty.class).get()).findAny().map(modelProperty -> modelProperty.getDeclaringField().getName()).orElse(defaultName);
    }

    private MessagingException createSourceException(InternalEvent event, Throwable cause) {
        MessagingException messagingException = new MessagingException(event, cause);
        return this.exceptionResolver.resolve(messagingException, this.muleContext);
    }

    public class DefaultSourceCompletionHandler
    implements SourceCompletionHandler {
        private final SourceCallbackExecutor onSuccessExecutor;
        private final SourceCallbackExecutor onErrorExecutor;
        private final SourceCallbackContextAdapter context;
        private final SourceCallbackExecutor onTerminateExecutor;

        public DefaultSourceCompletionHandler(SourceCallbackExecutor onSuccessExecutor, SourceCallbackExecutor onErrorExecutor, SourceCallbackExecutor onTerminateExecutor, SourceCallbackContextAdapter context) {
            this.onSuccessExecutor = onSuccessExecutor;
            this.onErrorExecutor = onErrorExecutor;
            this.onTerminateExecutor = onTerminateExecutor;
            this.context = context;
        }

        @Override
        public Publisher<Void> onCompletion(InternalEvent event, Map<String, Object> parameters) {
            return Mono.from(this.onSuccessExecutor.execute(event, parameters, this.context)).doOnSuccess(v -> this.commit());
        }

        @Override
        public Publisher<Void> onFailure(MessagingException exception, Map<String, Object> parameters) {
            return Mono.from(this.onErrorExecutor.execute(exception.getEvent(), parameters, this.context)).doAfterTerminate((v, e) -> this.rollback());
        }

        @Override
        public void onTerminate(Either<MessagingException, InternalEvent> result) throws Exception {
            InternalEvent event = result.isRight() ? (InternalEvent)result.getRight() : ((MessagingException)result.getLeft()).getEvent();
            Mono.from(this.onTerminateExecutor.execute(event, Collections.emptyMap(), this.context)).doAfterTerminate((v, e) -> this.context.releaseConnection()).subscribe();
        }

        private void commit() {
            try {
                this.context.getTransactionHandle().commit();
            }
            catch (TransactionException e) {
                LOGGER.error(String.format("Failed to commit transaction for message source at '%s': %s", SourceAdapter.this.componentLocation.toString(), e.getMessage()), (Throwable)e);
            }
        }

        private void rollback() {
            try {
                this.context.getTransactionHandle().rollback();
            }
            catch (TransactionException e) {
                LOGGER.error(String.format("Failed to rollback transaction for message source at '%s': %s", SourceAdapter.this.componentLocation.toString(), e.getMessage()), (Throwable)e);
            }
        }

        @Override
        public Map<String, Object> createResponseParameters(InternalEvent event) throws MessagingException {
            try {
                ResolverSetResult parameters = SourceAdapter.this.successCallbackParameters.resolve(ValueResolvingContext.from(event, SourceAdapter.this.configurationInstance));
                return parameters.asMap();
            }
            catch (Exception e) {
                throw SourceAdapter.this.createSourceException(event, e);
            }
        }

        @Override
        public Map<String, Object> createFailureResponseParameters(InternalEvent event) throws MessagingException {
            try {
                ResolverSetResult parameters = SourceAdapter.this.errorCallbackParameters.resolve(ValueResolvingContext.from(event, SourceAdapter.this.configurationInstance));
                return parameters.asMap();
            }
            catch (Exception e) {
                throw SourceAdapter.this.createSourceException(event, e);
            }
        }
    }
}

