/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.function;

import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionCustomizer;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.PollableBean;
import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration;
import org.springframework.cloud.function.context.config.FunctionContextUtils;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.BinderTypeRegistry;
import org.springframework.cloud.stream.binder.BindingCreatedEvent;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binding.BindableProxyFactory;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
import org.springframework.cloud.stream.config.BinderFactoryAutoConfiguration;
import org.springframework.cloud.stream.config.BindingBeansRegistrar;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.function.BindableFunctionProxyFactory;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.env.Environment;
import org.springframework.core.type.MethodMetadata;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.MessageChannelReactiveUtils;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.StandardIntegrationFlow;
import org.springframework.integration.handler.ServiceActivatingHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.function.Tuples;

@Configuration
@EnableConfigurationProperties(value={StreamFunctionProperties.class})
@Import(value={BindingBeansRegistrar.class, BinderFactoryAutoConfiguration.class})
@AutoConfigureBefore(value={BindingServiceConfiguration.class})
@AutoConfigureAfter(value={ContextFunctionCatalogAutoConfiguration.class})
@ConditionalOnBean(value={FunctionRegistry.class})
public class FunctionConfiguration {
    @Bean
    public InitializingBean functionBindingRegistrar(Environment environment, FunctionCatalog functionCatalog, StreamFunctionProperties streamFunctionProperties, BinderTypeRegistry binderTypeRegistry) {
        return new FunctionBindingRegistrar(binderTypeRegistry, functionCatalog, streamFunctionProperties);
    }

    @Bean
    public InitializingBean functionInitializer(FunctionCatalog functionCatalog, FunctionInspector functionInspector, StreamFunctionProperties functionProperties, @Nullable BindableProxyFactory[] bindableProxyFactories, BindingServiceProperties serviceProperties, ConfigurableApplicationContext applicationContext, FunctionBindingRegistrar bindingHolder, BinderAwareChannelResolver dynamicDestinationResolver) {
        if (bindableProxyFactories == null) {
            return null;
        }
        return new FunctionChannelBindingInitializer(functionCatalog, functionProperties, bindableProxyFactories, serviceProperties, dynamicDestinationResolver);
    }

    @Bean
    InitializingBean supplierInitializer(final FunctionCatalog functionCatalog, final StreamFunctionProperties functionProperties, final GenericApplicationContext context, final BindingServiceProperties serviceProperties, final @Nullable BindableFunctionProxyFactory[] proxyFactories, final BinderAwareChannelResolver dynamicDestinationResolver) {
        if (!ObjectUtils.isEmpty((Object[])context.getBeanNamesForAnnotation(EnableBinding.class)) || proxyFactories == null) {
            return null;
        }
        return new InitializingBean(){

            public void afterPropertiesSet() throws Exception {
                for (BindableFunctionProxyFactory proxyFactory : proxyFactories) {
                    String outputName2;
                    BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper functionWrapper = (BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper)functionCatalog.lookup(proxyFactory.getFunctionDefinition());
                    if (functionWrapper == null || !functionWrapper.isSupplier()) continue;
                    ArrayList<String> contentTypes = new ArrayList<String>();
                    Assert.isTrue((proxyFactory.getOutputs().size() == 1 ? 1 : 0) != 0, (String)"Supplier with multiple outputs is not supported at the moment.");
                    for (String outputName2 : proxyFactory.getOutputs()) {
                        BindingProperties bindingProperties = serviceProperties.getBindingProperties(outputName2);
                        String contentType = bindingProperties.getProducer() != null && bindingProperties.getProducer().isUseNativeEncoding() ? null : bindingProperties.getContentType();
                        contentTypes.add(contentType);
                    }
                    functionWrapper = (BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper)functionCatalog.lookup(proxyFactory.getFunctionDefinition(), contentTypes.toArray(new String[0]));
                    Publisher beginPublishingTrigger = FunctionConfiguration.this.setupBindingTrigger(context);
                    outputName2 = proxyFactory.getOutputs().iterator().next();
                    if (functionProperties.isComposeFrom() || functionProperties.isComposeTo()) continue;
                    String integrationFlowName = proxyFactory.getFunctionDefinition() + "_integrationflow";
                    PollableBean pollable = FunctionConfiguration.this.extractPollableAnnotation(functionProperties, context, proxyFactory);
                    StandardIntegrationFlow integrationFlow = ((IntegrationFlowBuilder)FunctionConfiguration.this.integrationFlowFromProvidedSupplier((Supplier)functionWrapper, (Publisher<Object>)beginPublishingTrigger, pollable).route(Message.class, message -> {
                        if (message.getHeaders().get((Object)"spring.cloud.stream.sendto.destination") != null) {
                            String destinationName = (String)message.getHeaders().get((Object)"spring.cloud.stream.sendto.destination");
                            return dynamicDestinationResolver.resolveDestination(destinationName);
                        }
                        return outputName2;
                    })).get();
                    IntegrationFlow postProcessedFlow = (IntegrationFlow)context.getAutowireCapableBeanFactory().applyBeanPostProcessorsBeforeInitialization((Object)integrationFlow, integrationFlowName);
                    context.registerBean(integrationFlowName, IntegrationFlow.class, () -> postProcessedFlow, new BeanDefinitionCustomizer[0]);
                }
            }
        };
    }

    private Publisher<Object> setupBindingTrigger(GenericApplicationContext context) {
        AtomicReference triggerRef = new AtomicReference();
        Mono beginPublishingTrigger = Mono.create(emmiter -> triggerRef.set(emmiter));
        context.addApplicationListener(event -> {
            if (event instanceof BindingCreatedEvent && triggerRef.get() != null) {
                ((MonoSink)triggerRef.get()).success();
            }
        });
        return beginPublishingTrigger;
    }

    private IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier<?> supplier, Publisher<Object> beginPublishingTrigger, PollableBean pollable) {
        IntegrationFlowBuilder integrationFlowBuilder;
        boolean splittable;
        Type functionType = ((BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper)supplier).getFunctionType();
        boolean bl = splittable = pollable != null && (Boolean)AnnotationUtils.getAnnotationAttributes((Annotation)pollable).get("splittable") != false;
        if (pollable == null && FunctionTypeUtils.isReactive((Type)FunctionTypeUtils.getInputType((Type)functionType, (int)0))) {
            Publisher publisher = (Publisher)supplier.get();
            publisher = publisher instanceof Mono ? ((Mono)publisher).delaySubscription(beginPublishingTrigger).map(this::wrapToMessageIfNecessary) : ((Flux)publisher).delaySubscription(beginPublishingTrigger).map(this::wrapToMessageIfNecessary);
            integrationFlowBuilder = IntegrationFlows.from((Publisher)publisher);
        } else {
            integrationFlowBuilder = IntegrationFlows.from(supplier);
            if (splittable) {
                integrationFlowBuilder = (IntegrationFlowBuilder)integrationFlowBuilder.split();
            }
        }
        return integrationFlowBuilder;
    }

    private PollableBean extractPollableAnnotation(StreamFunctionProperties functionProperties, GenericApplicationContext context, BindableFunctionProxyFactory proxyFactory) {
        Object source;
        String supplierFunctionName = StringUtils.delimitedListToStringArray((String)proxyFactory.getFunctionDefinition().replaceAll(",", "|").trim(), (String)"|")[0];
        BeanDefinition bd = context.getBeanDefinition(supplierFunctionName);
        if (!(bd instanceof RootBeanDefinition)) {
            return null;
        }
        Method factoryMethod = ((RootBeanDefinition)bd).getResolvedFactoryMethod();
        if (factoryMethod == null && (source = bd.getSource()) instanceof MethodMetadata) {
            Class factory = ClassUtils.resolveClassName((String)((MethodMetadata)source).getDeclaringClassName(), null);
            Class[] params = FunctionContextUtils.getParamTypesFromBeanDefinitionFactory((Class)factory, (AbstractBeanDefinition)((RootBeanDefinition)bd));
            factoryMethod = ReflectionUtils.findMethod((Class)factory, (String)((MethodMetadata)source).getMethodName(), (Class[])params);
        }
        Assert.notNull((Object)factoryMethod, (String)("Failed to introspect factory method since it was not discovered for function '" + functionProperties.getDefinition() + "'"));
        return factoryMethod.getReturnType().isAssignableFrom(Supplier.class) ? (PollableBean)AnnotationUtils.findAnnotation((Method)factoryMethod, PollableBean.class) : null;
    }

    private <T> Message<T> wrapToMessageIfNecessary(T value) {
        return value instanceof Message ? (Message)value : MessageBuilder.withPayload(value).build();
    }

    private static class FunctionBindingRegistrar
    implements InitializingBean,
    ApplicationContextAware,
    EnvironmentAware {
        private final BinderTypeRegistry binderTypeRegistry;
        private final FunctionCatalog functionCatalog;
        private final StreamFunctionProperties streamFunctionProperties;
        private ConfigurableApplicationContext applicationContext;
        private Environment environment;
        private int inputCount;
        private int outputCount;

        FunctionBindingRegistrar(BinderTypeRegistry binderTypeRegistry, FunctionCatalog functionCatalog, StreamFunctionProperties streamFunctionProperties) {
            this.binderTypeRegistry = binderTypeRegistry;
            this.functionCatalog = functionCatalog;
            this.streamFunctionProperties = streamFunctionProperties;
        }

        public void afterPropertiesSet() throws Exception {
            if (ObjectUtils.isEmpty((Object[])this.applicationContext.getBeanNamesForAnnotation(EnableBinding.class)) && this.determineFunctionName(this.functionCatalog, this.environment)) {
                String[] functionDefinitions;
                BeanDefinitionRegistry registry = (BeanDefinitionRegistry)this.applicationContext.getBeanFactory();
                for (String functionDefinition : functionDefinitions = this.streamFunctionProperties.getDefinition().split(";")) {
                    RootBeanDefinition functionBindableProxyDefinition = new RootBeanDefinition(BindableFunctionProxyFactory.class);
                    BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper function = (BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(functionDefinition);
                    if (function == null) continue;
                    Type functionType = function.getFunctionType();
                    if (function.isSupplier()) {
                        this.inputCount = 0;
                        this.outputCount = FunctionTypeUtils.getOutputCount((Type)functionType);
                    } else if (function.isConsumer()) {
                        this.inputCount = FunctionTypeUtils.getInputCount((Type)functionType);
                        this.outputCount = 0;
                    } else {
                        this.inputCount = FunctionTypeUtils.getInputCount((Type)functionType);
                        this.outputCount = FunctionTypeUtils.getOutputCount((Type)functionType);
                    }
                    functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue((Object)functionDefinition);
                    functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue((Object)this.inputCount);
                    functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue((Object)this.outputCount);
                    functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue((Object)this.streamFunctionProperties);
                    registry.registerBeanDefinition(functionDefinition + "_binding", (BeanDefinition)functionBindableProxyDefinition);
                }
            }
        }

        private boolean determineFunctionName(FunctionCatalog catalog, Environment environment) {
            String definition = this.streamFunctionProperties.getDefinition();
            if (!StringUtils.hasText((String)definition)) {
                definition = environment.getProperty("spring.cloud.function.definition");
            }
            if (StringUtils.hasText((String)definition)) {
                this.streamFunctionProperties.setDefinition(definition);
            } else if (Boolean.parseBoolean(environment.getProperty("spring.cloud.stream.function.routing.enabled", "false")) || environment.containsProperty("spring.cloud.function.routing-expression")) {
                this.streamFunctionProperties.setDefinition("functionRouter");
            } else {
                this.streamFunctionProperties.setDefinition(((FunctionInspector)this.functionCatalog).getName(this.functionCatalog.lookup("")));
            }
            return StringUtils.hasText((String)this.streamFunctionProperties.getDefinition());
        }

        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = (ConfigurableApplicationContext)applicationContext;
        }

        public void setEnvironment(Environment environment) {
            this.environment = environment;
        }
    }

    private static class FunctionWrapper
    implements Function<Message<byte[]>, Object> {
        private final Function function;
        private final ConsumerProperties consumerProperties;
        private final ProducerProperties producerProperties;
        private final Field headersField;

        FunctionWrapper(Function function, ConsumerProperties consumerProperties, ProducerProperties producerProperties) {
            this.function = function;
            this.consumerProperties = consumerProperties;
            this.producerProperties = producerProperties;
            this.headersField = ReflectionUtils.findField(MessageHeaders.class, (String)"headers");
            this.headersField.setAccessible(true);
        }

        @Override
        public Message<byte[]> apply(Message<byte[]> message) {
            Object result;
            if (message != null && this.consumerProperties != null) {
                Map headersMap = (Map)ReflectionUtils.getField((Field)this.headersField, (Object)message.getHeaders());
                headersMap.put("skip-type-conversion", this.consumerProperties.isUseNativeDecoding());
            }
            if ((result = this.function.apply(message)) instanceof Publisher) {
                throw new IllegalStateException("Routing to functions that return Publisher is not supported in the context of Spring Cloud Stream.");
            }
            return (Message)result;
        }
    }

    private static class FunctionChannelBindingInitializer
    implements InitializingBean,
    ApplicationContextAware {
        private static Log logger = LogFactory.getLog(FunctionChannelBindingInitializer.class);
        private final FunctionCatalog functionCatalog;
        private final StreamFunctionProperties functionProperties;
        private final BindableProxyFactory[] bindableProxyFactories;
        private final BindingServiceProperties serviceProperties;
        private final BinderAwareChannelResolver dynamicDestinationResolver;
        private GenericApplicationContext context;

        FunctionChannelBindingInitializer(FunctionCatalog functionCatalog, StreamFunctionProperties functionProperties, BindableProxyFactory[] bindableProxyFactories, BindingServiceProperties serviceProperties, BinderAwareChannelResolver dynamicDestinationResolver) {
            this.functionCatalog = functionCatalog;
            this.functionProperties = functionProperties;
            this.bindableProxyFactories = bindableProxyFactories;
            this.serviceProperties = serviceProperties;
            this.dynamicDestinationResolver = dynamicDestinationResolver;
        }

        public void afterPropertiesSet() throws Exception {
            Stream.of(this.bindableProxyFactories).forEach(bindableProxyFactory -> {
                String functionDefinition = this.getFunctionDefinition((BindableProxyFactory)bindableProxyFactory);
                BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper function = (BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(functionDefinition);
                if (function != null && !function.isSupplier()) {
                    if (this.isMultipleInputOutput((BindableProxyFactory)bindableProxyFactory)) {
                        this.bindMultipleArgumentsFunction((BindableProxyFactory)bindableProxyFactory, functionDefinition);
                    } else {
                        SubscribableChannel messageChannel = this.determineChannelToSubscribeTo((BindableProxyFactory)bindableProxyFactory);
                        if (messageChannel != null) {
                            this.bindOrComposeSimpleFunctions(messageChannel, (BindableProxyFactory)bindableProxyFactory, functionDefinition);
                        }
                    }
                }
            });
        }

        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.context = (GenericApplicationContext)applicationContext;
        }

        private String getFunctionDefinition(BindableProxyFactory bindableProxyFactory) {
            return bindableProxyFactory instanceof BindableFunctionProxyFactory ? ((BindableFunctionProxyFactory)bindableProxyFactory).getFunctionDefinition() : this.functionProperties.getDefinition();
        }

        private SubscribableChannel determineChannelToSubscribeTo(BindableProxyFactory bindableProxyFactory) {
            SubscribableChannel messageChannel = null;
            if (bindableProxyFactory instanceof BindableFunctionProxyFactory) {
                String channelName = ((BindableFunctionProxyFactory)bindableProxyFactory).getInputName(0);
                messageChannel = (SubscribableChannel)this.context.getBean(channelName, SubscribableChannel.class);
            } else {
                messageChannel = this.context.containsBean("input") ? (SubscribableChannel)this.context.getBean("input", SubscribableChannel.class) : (SubscribableChannel)this.context.getBean("output", SubscribableChannel.class);
            }
            return messageChannel;
        }

        private void bindMultipleArgumentsFunction(BindableProxyFactory bindableProxyFactory, String functionDefinition) {
            Object[] inputPublishers;
            Object resultPublishers;
            Assert.isTrue((!this.functionProperties.isComposeTo() && !this.functionProperties.isComposeFrom() ? 1 : 0) != 0, (String)"Composing to/from existing Sinks and Sources are not supported for functions with multiple arguments.");
            BindableFunctionProxyFactory functionProxyFactory = (BindableFunctionProxyFactory)bindableProxyFactory;
            Set<String> inputBindingNames = functionProxyFactory.getInputs();
            Set<String> outputBindingNames = functionProxyFactory.getOutputs();
            String[] outputContentTypes = (String[])outputBindingNames.stream().map(bindingName -> this.serviceProperties.getBindings().get(bindingName).getContentType()).toArray(String[]::new);
            BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper function = (BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(functionDefinition, outputContentTypes);
            if (this.isMultipleInputOutput(bindableProxyFactory)) {
                this.assertSupportedSignatures(function.getFunctionType());
            }
            if ((resultPublishers = function.apply((inputPublishers = (Publisher[])inputBindingNames.stream().map(inputBindingName -> {
                SubscribableChannel inputChannel = (SubscribableChannel)this.context.getBean(inputBindingName, SubscribableChannel.class);
                return MessageChannelReactiveUtils.toPublisher((MessageChannel)inputChannel);
            }).toArray(Publisher[]::new)).length == 1 ? inputPublishers[0] : Tuples.fromArray((Object[])inputPublishers))) instanceof Iterable) {
                Iterator<String> outputBindingIter = outputBindingNames.iterator();
                ((Iterable)resultPublishers).forEach(publisher -> {
                    MessageChannel outputChannel = (MessageChannel)this.context.getBean((String)outputBindingIter.next(), MessageChannel.class);
                    Flux.from((Publisher)((Publisher)publisher)).doOnNext(message -> outputChannel.send((Message)message)).subscribe();
                });
            } else {
                outputBindingNames.stream().forEach(outputBindingName -> {
                    MessageChannel outputChannel = (MessageChannel)this.context.getBean(outputBindingName, MessageChannel.class);
                    Flux.from((Publisher)((Publisher)resultPublishers)).doOnNext(message -> outputChannel.send((Message)message)).subscribe();
                });
            }
        }

        private void bindOrComposeSimpleFunctions(SubscribableChannel messageChannel, BindableProxyFactory bindableProxyFactory, String functionDefinition) {
            BindingProperties properties = this.serviceProperties.getBindingProperties(((AbstractMessageChannel)messageChannel).getBeanName());
            BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper function = (BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(functionDefinition, new String[]{properties.getContentType()});
            if (this.functionProperties.isComposeFrom()) {
                logger.info((Object)"Composing at the head of 'output' channel");
                this.composeSimpleFunctionToExistingFlow(function, messageChannel, bindableProxyFactory);
            } else {
                this.bindSimpleFunctions(function, messageChannel, bindableProxyFactory);
            }
        }

        private void composeSimpleFunctionToExistingFlow(BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper function, SubscribableChannel outputChannel, BindableProxyFactory bindableProxyFactory) {
            String outputChannelName = ((AbstractMessageChannel)outputChannel).getBeanName();
            ServiceActivatingHandler handler = this.createFunctionHandler(function, null, outputChannelName);
            DirectWithAttributesChannel newOutputChannel = new DirectWithAttributesChannel();
            newOutputChannel.setAttribute("type", "output");
            newOutputChannel.setComponentName("output.extended");
            this.context.registerBean("output.extended", MessageChannel.class, () -> newOutputChannel, new BeanDefinitionCustomizer[0]);
            bindableProxyFactory.replaceOutputChannel(outputChannelName, "output.extended", (MessageChannel)newOutputChannel);
            handler.setOutputChannelName("output.extended");
            outputChannel.subscribe((MessageHandler)handler);
        }

        private void bindSimpleFunctions(BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper function, SubscribableChannel inputChannel, BindableProxyFactory bindableProxyFactory) {
            String outputChannelName;
            Type functionType = function.getFunctionType();
            String string = bindableProxyFactory instanceof BindableFunctionProxyFactory ? ((BindableFunctionProxyFactory)bindableProxyFactory).getOutputName(0) : (outputChannelName = FunctionTypeUtils.isConsumer((Type)functionType) ? null : "output");
            if (FunctionTypeUtils.isReactive((Type)FunctionTypeUtils.getInputType((Type)functionType, (int)0)) && StringUtils.hasText((String)outputChannelName)) {
                MessageChannel outputChannel = (MessageChannel)this.context.getBean(outputChannelName, MessageChannel.class);
                Publisher publisher = MessageChannelReactiveUtils.toPublisher((MessageChannel)inputChannel);
                String bindingName = ((DirectWithAttributesChannel)inputChannel).getBeanName();
                this.subscribeToInput((Function)function, bindingName, publisher, message -> outputChannel.send(message));
            } else {
                String inputChannelName = ((AbstractMessageChannel)inputChannel).getBeanName();
                ServiceActivatingHandler handler = this.createFunctionHandler(function, inputChannelName, outputChannelName);
                if (!FunctionTypeUtils.isConsumer((Type)functionType)) {
                    handler.setOutputChannelName(outputChannelName);
                }
                inputChannel.subscribe((MessageHandler)handler);
            }
        }

        private ServiceActivatingHandler createFunctionHandler(BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper function, String inputChannelName, String outputChannelName) {
            ConsumerProperties consumerProperties = StringUtils.hasText((String)inputChannelName) ? this.serviceProperties.getBindingProperties(inputChannelName).getConsumer() : null;
            ProducerProperties producerProperties = StringUtils.hasText((String)outputChannelName) ? this.serviceProperties.getBindingProperties(outputChannelName).getProducer() : null;
            ServiceActivatingHandler handler = new ServiceActivatingHandler(new FunctionWrapper((Function)function, consumerProperties, producerProperties)){

                protected void sendOutputs(Object result, Message<?> requestMessage) {
                    if (result instanceof Message && ((Message)result).getHeaders().get((Object)"spring.cloud.stream.sendto.destination") != null) {
                        String destinationName = (String)((Message)result).getHeaders().get((Object)"spring.cloud.stream.sendto.destination");
                        MessageChannel outputChannel = dynamicDestinationResolver.resolveDestination(destinationName);
                        if (this.logger.isInfoEnabled()) {
                            this.logger.info((Object)("Output message is sent to '" + destinationName + "' destination"));
                        }
                        outputChannel.send((Message)result);
                    } else {
                        super.sendOutputs(result, requestMessage);
                    }
                }
            };
            handler.setBeanFactory((BeanFactory)this.context);
            handler.afterPropertiesSet();
            return handler;
        }

        private void subscribeToInput(Function function, String bindingName, Publisher publisher, Consumer<Message> outputProcessor) {
            Flux inputPublisher = Flux.from((Publisher)publisher);
            AtomicReference originalMessageRef = new AtomicReference();
            AtomicReference consumerPropertiesRef = new AtomicReference();
            AtomicReference<Object> bindingErrorChannelRef = new AtomicReference<Object>(this.context.getBean("nullChannel", MessageChannel.class));
            Flux result = inputPublisher.switchOnFirst((x, message) -> {
                consumerPropertiesRef.set(this.serviceProperties.getBindings().get(bindingName).getConsumer());
                String destination = this.serviceProperties.getBindings().get(bindingName).getDestination();
                String group = this.serviceProperties.getBindings().get(bindingName).getGroup();
                String bindingErrorChannelName = destination + "." + group + ".errors";
                if (this.context.containsBean(bindingErrorChannelName)) {
                    bindingErrorChannelRef.set(this.context.getBean(bindingErrorChannelName, MessageChannel.class));
                }
                return message;
            }).concatMap(message -> Flux.just((Object)message).doOnNext(originalMessageRef::set).transform(function).retryBackoff((long)((ConsumerProperties)consumerPropertiesRef.get()).getMaxAttempts(), Duration.ofMillis(((ConsumerProperties)consumerPropertiesRef.get()).getBackOffInitialInterval()), Duration.ofMillis(((ConsumerProperties)consumerPropertiesRef.get()).getBackOffMaxInterval())).onErrorResume(e -> {
                ((MessageChannel)bindingErrorChannelRef.get()).send((Message)new ErrorMessage((Throwable)e, ((Message)originalMessageRef.get()).getHeaders(), (Message)originalMessageRef.get()));
                return Mono.empty();
            }));
            this.subscribeToOutput(outputProcessor, (Flux<Message>)result).subscribe();
        }

        private Mono<Void> subscribeToOutput(Consumer<Message> outputProcessor, Flux<Message> resultPublisher) {
            Flux output = outputProcessor == null ? resultPublisher : resultPublisher.doOnNext(outputProcessor);
            return output.then();
        }

        private void assertSupportedSignatures(Type functionType) {
            Assert.isTrue((!FunctionTypeUtils.isConsumer((Type)functionType) ? 1 : 0) != 0, (String)("Function '" + this.functionProperties.getDefinition() + "' is a Consumer which is not supported for multi-in/out reactive streams. Only Functions are supported"));
            Assert.isTrue((!FunctionTypeUtils.isSupplier((Type)functionType) ? 1 : 0) != 0, (String)("Function '" + this.functionProperties.getDefinition() + "' is a Supplier which is not supported for multi-in/out reactive streams. Only Functions are supported"));
            Assert.isTrue((!FunctionTypeUtils.isInputArray((Type)functionType) && !FunctionTypeUtils.isOutputArray((Type)functionType) ? 1 : 0) != 0, (String)("Function '" + this.functionProperties.getDefinition() + "' has the following signature: [" + functionType + "]. Your input and/or outout lacks arity and therefore we can not determine how many input/output destinations are required in the context of function input/output binding."));
            int inputCount = FunctionTypeUtils.getInputCount((Type)functionType);
            for (int i = 0; i < inputCount; ++i) {
                Assert.isTrue((boolean)FunctionTypeUtils.isReactive((Type)FunctionTypeUtils.getInputType((Type)functionType, (int)i)), (String)("Function '" + this.functionProperties.getDefinition() + "' has the following signature: [" + functionType + "]. Non-reactive functions with multiple inputs/outputs are not supported in the context of Spring Cloud Stream."));
            }
            int outputCount = FunctionTypeUtils.getOutputCount((Type)functionType);
            for (int i = 0; i < outputCount; ++i) {
                Assert.isTrue((boolean)FunctionTypeUtils.isReactive((Type)FunctionTypeUtils.getInputType((Type)functionType, (int)i)), (String)("Function '" + this.functionProperties.getDefinition() + "' has the following signature: [" + functionType + "]. Non-reactive functions with multiple inputs/outputs are not supported in the context of Spring Cloud Stream."));
            }
        }

        private boolean isMultipleInputOutput(BindableProxyFactory bindableProxyFactory) {
            return bindableProxyFactory instanceof BindableFunctionProxyFactory && ((BindableFunctionProxyFactory)bindableProxyFactory).isMultiple();
        }
    }
}

