/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.function;

import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.observation.ObservationRegistry;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionCustomizer;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.PollableBean;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration;
import org.springframework.cloud.function.context.config.FunctionContextUtils;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.PartitionHandler;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binding.BindableProxyFactory;
import org.springframework.cloud.stream.binding.DefaultPartitioningInterceptor;
import org.springframework.cloud.stream.binding.NewDestinationBindingCallback;
import org.springframework.cloud.stream.binding.SupportedBindableFeatures;
import org.springframework.cloud.stream.config.BinderFactoryAutoConfiguration;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.function.BindableFunctionProxyFactory;
import org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.cloud.stream.function.StreamFunctionConfigurationProperties;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment;
import org.springframework.core.type.MethodMetadata;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.AbstractSubscribableChannel;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec;
import org.springframework.integration.dsl.StandardIntegrationFlow;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;
import reactor.util.function.Tuples;

@Lazy(value=false)
@AutoConfiguration
@EnableConfigurationProperties(value={StreamFunctionConfigurationProperties.class})
@Import(value={BinderFactoryAutoConfiguration.class})
@AutoConfigureBefore(value={BindingServiceConfiguration.class})
@AutoConfigureAfter(value={ContextFunctionCatalogAutoConfiguration.class})
@ConditionalOnBean(value={FunctionRegistry.class})
public class FunctionConfiguration {
    private static final boolean isContextPropagationPresent = ClassUtils.isPresent((String)"io.micrometer.context.ContextSnapshot", (ClassLoader)FunctionConfiguration.class.getClassLoader());

    @Bean
    public StreamBridge streamBridgeUtils(FunctionCatalog functionCatalog, BindingServiceProperties bindingServiceProperties, ConfigurableApplicationContext applicationContext, @Nullable NewDestinationBindingCallback callback, ObjectProvider<ObservationRegistry> observationRegistries) {
        return new StreamBridge(functionCatalog, bindingServiceProperties, applicationContext, callback, observationRegistries);
    }

    @Bean
    public InitializingBean functionBindingRegistrar(Environment environment, FunctionCatalog functionCatalog, StreamFunctionProperties streamFunctionProperties) {
        return new FunctionBindingRegistrar(functionCatalog, streamFunctionProperties);
    }

    @Bean
    public InitializingBean functionInitializer(FunctionCatalog functionCatalog, StreamFunctionProperties functionProperties, BindingServiceProperties serviceProperties, ConfigurableApplicationContext applicationContext, StreamBridge streamBridge) {
        return new FunctionToDestinationBinder(functionCatalog, functionProperties, serviceProperties, streamBridge);
    }

    @Bean
    InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunctionProperties functionProperties, GenericApplicationContext context, BindingServiceProperties serviceProperties, @Nullable List<BindableFunctionProxyFactory> proxyFactories, StreamBridge streamBridge, TaskScheduler taskScheduler) {
        if (CollectionUtils.isEmpty(proxyFactories)) {
            return null;
        }
        return () -> {
            for (BindableFunctionProxyFactory proxyFactory : proxyFactories) {
                String functionDefinition;
                Object functionWrapper = (SimpleFunctionRegistry.FunctionInvocationWrapper)functionCatalog.lookup(proxyFactory.getFunctionDefinition());
                if (functionWrapper == null || !functionWrapper.isSupplier()) continue;
                ArrayList<String> contentTypes = new ArrayList<String>();
                if (proxyFactory.getOutputs().size() == 0) {
                    return;
                }
                Assert.isTrue((proxyFactory.getOutputs().size() == 1 ? 1 : 0) != 0, (String)"Supplier with multiple outputs is not supported at the moment.");
                String outputName = proxyFactory.getOutputs().iterator().next();
                BindingProperties bindingProperties = serviceProperties.getBindingProperties(outputName);
                ProducerProperties producerProperties = bindingProperties.getProducer();
                if (bindingProperties.getProducer() == null || !producerProperties.isUseNativeEncoding()) {
                    contentTypes.add(bindingProperties.getContentType());
                }
                if (!StringUtils.hasText((String)(functionDefinition = proxyFactory.getFunctionDefinition()))) continue;
                Object[] functionNames = StringUtils.delimitedListToStringArray((String)functionDefinition.replaceAll(",", "|").trim(), (String)"|");
                Function supplier = null;
                Function function = null;
                if (!ObjectUtils.isEmpty((Object[])functionNames) && functionNames.length > 1) {
                    Object supplierName = functionNames[0];
                    String remainingFunctionDefinition = StringUtils.arrayToCommaDelimitedString((Object[])Arrays.copyOfRange(functionNames, 1, functionNames.length));
                    supplier = (Function)functionCatalog.lookup((String)supplierName);
                    function = (Function)functionCatalog.lookup(remainingFunctionDefinition, contentTypes.toArray(new String[0]));
                    functionWrapper = !((SimpleFunctionRegistry.FunctionInvocationWrapper)supplier).isOutputTypePublisher() && ((SimpleFunctionRegistry.FunctionInvocationWrapper)function).isInputTypePublisher() ? null : (SimpleFunctionRegistry.FunctionInvocationWrapper)functionCatalog.lookup(proxyFactory.getFunctionDefinition(), contentTypes.toArray(new String[0]));
                } else {
                    functionWrapper = (SimpleFunctionRegistry.FunctionInvocationWrapper)functionCatalog.lookup(proxyFactory.getFunctionDefinition(), contentTypes.toArray(new String[0]));
                }
                if (functionProperties.isComposeFrom() || functionProperties.isComposeTo()) continue;
                String integrationFlowName = proxyFactory.getFunctionDefinition() + "_integrationflow";
                PollableBean pollable = null;
                try {
                    pollable = this.extractPollableAnnotation(functionProperties, context, proxyFactory);
                }
                catch (Exception exception) {
                    // empty catch block
                }
                if (functionWrapper != null) {
                    final SimpleFunctionRegistry.FunctionInvocationWrapper postProcessor = functionWrapper;
                    StandardIntegrationFlow integrationFlow = ((IntegrationFlowBuilder)((IntegrationFlowBuilder)this.integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper((Function<?, ?>)functionWrapper, (ConfigurableApplicationContext)context, producerProperties), pollable, context, taskScheduler, producerProperties, outputName).intercept(new ChannelInterceptor[]{new ChannelInterceptor(){

                        public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                            postProcessor.postProcess();
                        }
                    }})).route(Message.class, message -> {
                        if (message.getHeaders().get((Object)"spring.cloud.stream.sendto.destination") != null) {
                            String destinationName = (String)message.getHeaders().get((Object)"spring.cloud.stream.sendto.destination");
                            return streamBridge.resolveDestination(destinationName, producerProperties, null);
                        }
                        return outputName;
                    })).get();
                    IntegrationFlow postProcessedFlow = (IntegrationFlow)context.getAutowireCapableBeanFactory().initializeBean((Object)integrationFlow, integrationFlowName);
                    context.registerBean(integrationFlowName, IntegrationFlow.class, () -> postProcessedFlow, new BeanDefinitionCustomizer[0]);
                    continue;
                }
                StandardIntegrationFlow integrationFlow = ((IntegrationFlowBuilder)((IntegrationFlowBuilder)((IntegrationFlowBuilder)this.integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper(supplier, (ConfigurableApplicationContext)context, producerProperties), pollable, context, taskScheduler, producerProperties, outputName).channel(c -> c.direct())).fluxTransform(function)).route(Message.class, message -> {
                    if (message.getHeaders().get((Object)"spring.cloud.stream.sendto.destination") != null) {
                        String destinationName = (String)message.getHeaders().get((Object)"spring.cloud.stream.sendto.destination");
                        return streamBridge.resolveDestination(destinationName, producerProperties, null);
                    }
                    return outputName;
                })).get();
                IntegrationFlow postProcessedFlow = (IntegrationFlow)context.getAutowireCapableBeanFactory().initializeBean((Object)integrationFlow, integrationFlowName);
                context.registerBean(integrationFlowName, IntegrationFlow.class, () -> postProcessedFlow, new BeanDefinitionCustomizer[0]);
            }
        };
    }

    private IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier<?> supplier, PollableBean pollable, GenericApplicationContext context, TaskScheduler taskScheduler, ProducerProperties producerProperties, String bindingName) {
        IntegrationFlowBuilder integrationFlowBuilder;
        SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper;
        boolean splittable;
        boolean bl = splittable = pollable != null && (Boolean)AnnotationUtils.getAnnotationAttributes((Annotation)pollable).get("splittable") != false;
        if (supplier instanceof PartitionAwareFunctionWrapper) {
            PartitionAwareFunctionWrapper partitionAwareFunctionWrapper = (PartitionAwareFunctionWrapper)supplier;
            functionInvocationWrapper = (SimpleFunctionRegistry.FunctionInvocationWrapper)partitionAwareFunctionWrapper.function;
        } else {
            functionInvocationWrapper = (SimpleFunctionRegistry.FunctionInvocationWrapper)supplier;
        }
        SimpleFunctionRegistry.FunctionInvocationWrapper function = functionInvocationWrapper;
        boolean reactive = FunctionTypeUtils.isPublisher((Type)function.getOutputType());
        if (pollable == null && reactive) {
            Publisher publisher = (Publisher)supplier.get();
            publisher = publisher instanceof Mono ? ((Mono)publisher).map(this::wrapToMessageIfNecessary) : ((Flux)publisher).map(this::wrapToMessageIfNecessary);
            DirectWithAttributesChannel messageChannel = (DirectWithAttributesChannel)((Object)context.getBean(bindingName, DirectWithAttributesChannel.class));
            FluxMessageChannel reactiveChannel = new FluxMessageChannel();
            reactiveChannel.subscribeTo(publisher);
            messageChannel.setAttribute(DirectWithAttributesChannel.COMPANION_ATTR, reactiveChannel);
            integrationFlowBuilder = IntegrationFlow.from((MessageChannel)reactiveChannel);
            taskScheduler.schedule(() -> {}, Instant.now());
        } else {
            AtomicReference<PollerMetadata> pollerMetadata = new AtomicReference<PollerMetadata>();
            if (producerProperties != null && producerProperties.getPoller() != null) {
                ProducerProperties.PollerProperties poller = producerProperties.getPoller();
                PollerMetadata pm = new PollerMetadata();
                PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
                map.from(poller::getMaxMessagesPerPoll).to(arg_0 -> ((PollerMetadata)pm).setMaxMessagesPerPoll(arg_0));
                map.from((Object)poller).as(this::asTrigger).to(arg_0 -> ((PollerMetadata)pm).setTrigger(arg_0));
                pollerMetadata.set(pm);
            }
            boolean autoStartup = producerProperties == null || producerProperties.isAutoStartup();
            IntegrationFlowBuilder integrationFlowBuilder2 = integrationFlowBuilder = pollerMetadata == null ? IntegrationFlow.fromSupplier(supplier, spca -> ((SourcePollingChannelAdapterSpec)spca.id(bindingName + "_spca")).autoStartup(autoStartup)) : IntegrationFlow.fromSupplier(supplier, spca -> ((SourcePollingChannelAdapterSpec)spca.id(bindingName + "_spca")).poller((PollerMetadata)pollerMetadata.get()).autoStartup(autoStartup));
            if (splittable && reactive) {
                integrationFlowBuilder = (IntegrationFlowBuilder)integrationFlowBuilder.split();
            }
        }
        return integrationFlowBuilder;
    }

    private Trigger asTrigger(ProducerProperties.PollerProperties poller) {
        if (StringUtils.hasText((String)poller.getCron())) {
            return new CronTrigger(poller.getCron());
        }
        return this.createPeriodicTrigger(poller.getFixedDelay(), poller.getInitialDelay());
    }

    private Trigger createPeriodicTrigger(Duration period, Duration initialDelay) {
        PeriodicTrigger trigger = new PeriodicTrigger(period);
        if (initialDelay != null) {
            trigger.setInitialDelay(initialDelay);
        }
        return trigger;
    }

    private PollableBean extractPollableAnnotation(StreamFunctionProperties functionProperties, GenericApplicationContext context, BindableFunctionProxyFactory proxyFactory) {
        Object source;
        String supplierFunctionName = StringUtils.delimitedListToStringArray((String)proxyFactory.getFunctionDefinition().replaceAll(",", "|").trim(), (String)"|")[0];
        BeanDefinition bd = context.getBeanDefinition(supplierFunctionName);
        if (!(bd instanceof RootBeanDefinition)) {
            return null;
        }
        RootBeanDefinition rootBeanDefinition = (RootBeanDefinition)bd;
        Method factoryMethod = rootBeanDefinition.getResolvedFactoryMethod();
        if (factoryMethod == null && (source = bd.getSource()) instanceof MethodMetadata) {
            MethodMetadata methodMetadata = (MethodMetadata)source;
            Class factory = ClassUtils.resolveClassName((String)methodMetadata.getDeclaringClassName(), null);
            Class[] params = FunctionContextUtils.getParamTypesFromBeanDefinitionFactory((Class)factory, (AbstractBeanDefinition)((RootBeanDefinition)bd), (String)methodMetadata.getMethodName());
            factoryMethod = ReflectionUtils.findMethod((Class)factory, (String)methodMetadata.getMethodName(), (Class[])params);
        }
        Assert.notNull((Object)factoryMethod, (String)("Failed to introspect factory method since it was not discovered for function '" + functionProperties.getDefinition() + "'"));
        return factoryMethod.getReturnType().isAssignableFrom(Supplier.class) ? (PollableBean)AnnotationUtils.findAnnotation((Method)factoryMethod, PollableBean.class) : null;
    }

    private <T> Message<T> wrapToMessageIfNecessary(T value) {
        Message message;
        return value instanceof Message ? (message = (Message)value) : MessageBuilder.withPayload(value).build();
    }

    private static <P> Message<P> sanitize(Message<P> inputMessage) {
        return ((MessageBuilder)MessageBuilder.fromMessage(inputMessage).removeHeader("spring.cloud.stream.sendto.destination")).build();
    }

    private static class FunctionBindingRegistrar
    implements InitializingBean,
    ApplicationContextAware,
    EnvironmentAware {
        protected final Log logger = LogFactory.getLog(this.getClass());
        private final FunctionCatalog functionCatalog;
        private final StreamFunctionProperties streamFunctionProperties;
        private ConfigurableApplicationContext applicationContext;
        private Environment environment;
        private int inputCount;
        private int outputCount;

        FunctionBindingRegistrar(FunctionCatalog functionCatalog, StreamFunctionProperties streamFunctionProperties) {
            this.functionCatalog = functionCatalog;
            this.streamFunctionProperties = streamFunctionProperties;
        }

        public void afterPropertiesSet() throws Exception {
            this.determineFunctionName(this.functionCatalog, this.environment);
            if (StringUtils.hasText((String)this.streamFunctionProperties.getDefinition())) {
                String[] functionDefinitions;
                for (String functionDefinition : functionDefinitions = this.filterEligibleFunctionDefinitions()) {
                    SimpleFunctionRegistry.FunctionInvocationWrapper function = (SimpleFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(functionDefinition);
                    if (function != null) {
                        if (function.isSupplier()) {
                            this.inputCount = 0;
                            this.outputCount = this.getOutputCount(function, true);
                        } else if (function.isConsumer() || function.isRoutingFunction()) {
                            this.inputCount = FunctionTypeUtils.getInputCount((SimpleFunctionRegistry.FunctionInvocationWrapper)function);
                            this.outputCount = 0;
                        } else {
                            this.inputCount = FunctionTypeUtils.getInputCount((SimpleFunctionRegistry.FunctionInvocationWrapper)function);
                            this.outputCount = function.isWrappedBiConsumer() ? 0 : this.getOutputCount(function, false);
                        }
                        AtomicReference<BindableFunctionProxyFactory> proxyFactory = new AtomicReference<BindableFunctionProxyFactory>();
                        if (function.isInputTypePublisher()) {
                            SupportedBindableFeatures supportedBindableFeatures = new SupportedBindableFeatures();
                            supportedBindableFeatures.setPollable(false);
                            supportedBindableFeatures.setReactive(true);
                            proxyFactory.set(new BindableFunctionProxyFactory(functionDefinition, this.inputCount, this.outputCount, this.streamFunctionProperties, supportedBindableFeatures));
                        } else {
                            proxyFactory.set(new BindableFunctionProxyFactory(functionDefinition, this.inputCount, this.outputCount, this.streamFunctionProperties));
                        }
                        ((GenericApplicationContext)this.applicationContext).registerBean(functionDefinition + "_binding", BindableFunctionProxyFactory.class, proxyFactory::get, new BeanDefinitionCustomizer[0]);
                        continue;
                    }
                    this.logger.warn((Object)("The function definition '" + this.streamFunctionProperties.getDefinition() + "' is not valid. The referenced function bean or one of its components does not exist"));
                }
            }
            this.createStandAloneBindingsIfNecessary((BindingServiceProperties)this.applicationContext.getBean(BindingServiceProperties.class));
        }

        private void createStandAloneBindingsIfNecessary(BindingServiceProperties bindingProperties) {
            BindableFunctionProxyFactory proxyFactory;
            SimpleFunctionRegistry.FunctionInvocationWrapper sourceFunc;
            String[] inputBindings = StringUtils.hasText((String)bindingProperties.getInputBindings()) ? bindingProperties.getInputBindings().split(";") : new String[]{};
            String[] outputBindings = StringUtils.hasText((String)bindingProperties.getOutputBindings()) ? bindingProperties.getOutputBindings().split(";") : new String[]{};
            for (String inputBindingName : inputBindings) {
                sourceFunc = (SimpleFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(inputBindingName);
                if (sourceFunc != null && !sourceFunc.getFunctionDefinition().equals(inputBindingName)) {
                    sourceFunc = null;
                }
                if (sourceFunc != null && !sourceFunc.isSupplier() && (sourceFunc.getFunctionDefinition().equals(inputBindingName) || !this.applicationContext.containsBean(inputBindingName))) continue;
                proxyFactory = new BindableFunctionProxyFactory(inputBindingName, 1, 0, this.streamFunctionProperties, sourceFunc != null);
                ((GenericApplicationContext)this.applicationContext).registerBean(inputBindingName + "_binding_in", BindableFunctionProxyFactory.class, () -> proxyFactory, new BeanDefinitionCustomizer[0]);
            }
            for (String outputBindingName : outputBindings) {
                sourceFunc = (SimpleFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(outputBindingName);
                if (sourceFunc != null && !sourceFunc.getFunctionDefinition().equals(outputBindingName)) {
                    sourceFunc = null;
                }
                if (sourceFunc != null && !sourceFunc.isConsumer() && (sourceFunc.getFunctionDefinition().equals(outputBindingName) || !this.applicationContext.containsBean(outputBindingName))) continue;
                proxyFactory = new BindableFunctionProxyFactory(outputBindingName, 0, 1, this.streamFunctionProperties, sourceFunc != null);
                ((GenericApplicationContext)this.applicationContext).registerBean(outputBindingName + "_binding_out", BindableFunctionProxyFactory.class, () -> proxyFactory, new BeanDefinitionCustomizer[0]);
            }
        }

        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = (ConfigurableApplicationContext)applicationContext;
        }

        public void setEnvironment(Environment environment) {
            this.environment = environment;
        }

        private int getOutputCount(SimpleFunctionRegistry.FunctionInvocationWrapper function, boolean isSupplier) {
            int outputCount = FunctionTypeUtils.getOutputCount((SimpleFunctionRegistry.FunctionInvocationWrapper)function);
            if (!function.isSupplier() && function.getOutputType() instanceof ParameterizedType) {
                Type outputType = function.getOutputType();
                if (FunctionTypeUtils.isMono((Type)outputType) && outputType instanceof ParameterizedType && FunctionTypeUtils.getRawType((Type)((ParameterizedType)outputType).getActualTypeArguments()[0]).equals(Void.class)) {
                    outputCount = 0;
                } else if (FunctionTypeUtils.getRawType((Type)outputType).equals(Void.class)) {
                    outputCount = 0;
                }
            }
            return outputCount;
        }

        private boolean determineFunctionName(FunctionCatalog catalog, Environment environment) {
            SimpleFunctionRegistry.FunctionInvocationWrapper function;
            boolean autodetect = (Boolean)environment.getProperty("spring.cloud.stream.function.autodetect", Boolean.TYPE, (Object)true);
            String definition = this.streamFunctionProperties.getDefinition();
            if (!StringUtils.hasText((String)definition)) {
                definition = environment.getProperty("spring.cloud.function.definition");
            }
            if (StringUtils.hasText((String)definition)) {
                this.streamFunctionProperties.setDefinition(definition);
            } else if (Boolean.parseBoolean(environment.getProperty("spring.cloud.stream.function.routing.enabled", "false")) || environment.containsProperty("spring.cloud.function.routing-expression")) {
                this.streamFunctionProperties.setDefinition("functionRouter");
            } else if (autodetect && (function = (SimpleFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup("")) != null) {
                this.streamFunctionProperties.setDefinition(function.getFunctionDefinition());
            }
            return StringUtils.hasText((String)this.streamFunctionProperties.getDefinition());
        }

        private String[] filterEligibleFunctionDefinitions() {
            String[] functionDefinitions;
            ArrayList<String> eligibleFunctionDefinitions = new ArrayList<String>();
            for (String functionDefinition : functionDefinitions = this.streamFunctionProperties.getDefinition().split(";")) {
                functionDefinition = functionDefinition.trim();
                String[] functionNames = StringUtils.delimitedListToStringArray((String)functionDefinition.replaceAll(",", "|").trim(), (String)"|");
                boolean eligibleDefinition = true;
                for (int i = 0; i < functionNames.length && eligibleDefinition; ++i) {
                    String functionName = functionNames[i];
                    if (this.applicationContext.containsBean(functionName)) {
                        Object functionBean = this.applicationContext.getBean(functionName);
                        Type functionType = FunctionTypeUtils.discoverFunctionType((Object)functionBean, (String)functionName, (GenericApplicationContext)((GenericApplicationContext)this.applicationContext));
                        if (functionType == null) {
                            eligibleDefinition = false;
                            continue;
                        }
                        String functionTypeStringValue = functionType.toString();
                        if (!functionTypeStringValue.contains("KTable") && !functionTypeStringValue.contains("KStream")) continue;
                        eligibleDefinition = false;
                        continue;
                    }
                    this.logger.warn((Object)("You have defined function definition that does not exist: " + functionName));
                }
                if (!eligibleDefinition) continue;
                eligibleFunctionDefinitions.add(functionDefinition);
            }
            return eligibleFunctionDefinitions.toArray(new String[0]);
        }
    }

    private static class FunctionToDestinationBinder
    implements InitializingBean,
    ApplicationContextAware {
        protected final Log logger = LogFactory.getLog(this.getClass());
        private GenericApplicationContext applicationContext;
        private BindableProxyFactory[] bindableProxyFactories;
        private final FunctionCatalog functionCatalog;
        private final StreamFunctionProperties functionProperties;
        private final BindingServiceProperties serviceProperties;
        private final StreamBridge streamBridge;

        FunctionToDestinationBinder(FunctionCatalog functionCatalog, StreamFunctionProperties functionProperties, BindingServiceProperties serviceProperties, StreamBridge streamBridge) {
            this.functionCatalog = functionCatalog;
            this.functionProperties = functionProperties;
            this.serviceProperties = serviceProperties;
            this.streamBridge = streamBridge;
        }

        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = (GenericApplicationContext)applicationContext;
        }

        public void afterPropertiesSet() throws Exception {
            Map beansOfType = this.applicationContext.getBeansOfType(BindableProxyFactory.class);
            for (BindableProxyFactory bindableProxyFactory : this.bindableProxyFactories = beansOfType.values().toArray(new BindableProxyFactory[0])) {
                SimpleFunctionRegistry.FunctionInvocationWrapper function;
                BindableFunctionProxyFactory functionFactory;
                String functionDefinition = bindableProxyFactory instanceof BindableFunctionProxyFactory && (functionFactory = (BindableFunctionProxyFactory)bindableProxyFactory).isFunctionExist() ? functionFactory.getFunctionDefinition() : null;
                boolean shouldNotProcess = false;
                if (!(bindableProxyFactory instanceof BindableFunctionProxyFactory)) {
                    Set<String> outputBindingNames = bindableProxyFactory.getOutputs();
                    boolean bl = shouldNotProcess = !CollectionUtils.isEmpty(outputBindingNames) && outputBindingNames.iterator().next().equals("applicationMetrics");
                }
                if (!StringUtils.hasText((String)functionDefinition) || shouldNotProcess || (function = (SimpleFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(functionDefinition)) == null || function.isSupplier() || !functionDefinition.equals(function.getFunctionDefinition())) continue;
                this.bindFunctionToDestinations(bindableProxyFactory, functionDefinition, this.applicationContext.getEnvironment());
            }
        }

        private void bindFunctionToDestinations(BindableProxyFactory bindableProxyFactory, String functionDefinition, ConfigurableEnvironment environment) {
            this.assertBindingIsPossible(bindableProxyFactory);
            Set<String> inputBindingNames = bindableProxyFactory.getInputs();
            Set<String> outputBindingNames = bindableProxyFactory.getOutputs();
            String[] outputContentTypes = (String[])outputBindingNames.stream().map(bindingName -> this.serviceProperties.getBindings().get(bindingName).getContentType()).toArray(String[]::new);
            SimpleFunctionRegistry.FunctionInvocationWrapper function = (SimpleFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(functionDefinition, outputContentTypes);
            this.assertSupportedSignatures(bindableProxyFactory, function);
            if (this.functionProperties.isComposeFrom()) {
                AbstractSubscribableChannel outputChannel = (AbstractSubscribableChannel)this.applicationContext.getBean(outputBindingNames.iterator().next(), AbstractSubscribableChannel.class);
                this.logger.info((Object)("Composing at the head of output destination: " + outputChannel.getBeanName()));
                String outputChannelName = outputChannel.getBeanName();
                DirectWithAttributesChannel newOutputChannel = new DirectWithAttributesChannel();
                newOutputChannel.setAttribute("type", "output");
                newOutputChannel.setComponentName("output.extended");
                this.applicationContext.registerBean("output.extended", MessageChannel.class, () -> newOutputChannel, new BeanDefinitionCustomizer[0]);
                bindableProxyFactory.replaceOutputChannel(outputChannelName, "output.extended", (MessageChannel)newOutputChannel);
                inputBindingNames = Collections.singleton("output");
            }
            if (this.isReactiveOrMultipleInputOutput(bindableProxyFactory, function.getInputType(), function.getOutputType())) {
                Object resultPublishers;
                AtomicReference<Function<Message, Message>> targetProtocolEnhancer = new AtomicReference<Function<Message, Message>>();
                if (!CollectionUtils.isEmpty(outputBindingNames)) {
                    String outputBindingName = outputBindingNames.iterator().next();
                    String binderConfigurationName = this.serviceProperties.getBinder(outputBindingName);
                    BinderFactory binderFactory = (BinderFactory)this.applicationContext.getBean(BinderFactory.class);
                    Boolean reactive = this.functionProperties.getReactive().get(functionDefinition);
                    boolean reactiveFn = reactive != null && reactive != false;
                    Class<MessageChannel> bindableType = MessageChannel.class;
                    if (reactiveFn) {
                        bindableType = FluxMessageChannel.class;
                    }
                    Binder<MessageChannel, ConsumerProperties, ProducerProperties> binder = binderFactory.getBinder(binderConfigurationName, bindableType);
                    Field headersField = ReflectionUtils.findField(MessageHeaders.class, (String)"headers");
                    headersField.setAccessible(true);
                    targetProtocolEnhancer.set(message -> {
                        Map headersMap = (Map)ReflectionUtils.getField((Field)headersField, (Object)message.getHeaders());
                        if (CloudEventMessageUtils.isCloudEvent((Message)message)) {
                            headersMap.putIfAbsent(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE);
                        }
                        return message;
                    });
                }
                Object[] inputPublishers = (Publisher[])inputBindingNames.stream().map(inputBindingName -> {
                    ConsumerProperties consumerProperties;
                    BindingProperties bindingProperties = this.serviceProperties.getBindings().get(inputBindingName);
                    ConsumerProperties consumerProperties2 = consumerProperties = bindingProperties == null ? null : bindingProperties.getConsumer();
                    if (consumerProperties != null) {
                        function.setSkipInputConversion(consumerProperties.isUseNativeDecoding());
                        if (consumerProperties.getConcurrency() > 1) {
                            this.logger.warn((Object)"When using concurrency > 1 in reactive contexts, please make sure that you are using a reactive binder that supports concurrency settings. Otherwise, concurrency settings > 1 will be ignored when using reactive types.");
                        }
                    }
                    MessageChannel inputChannel = (MessageChannel)this.applicationContext.getBean(inputBindingName, MessageChannel.class);
                    return IntegrationReactiveUtils.messageChannelToFlux((MessageChannel)inputChannel).map(m -> {
                        if (m != null) {
                            m = FunctionConfiguration.sanitize(m);
                        }
                        return m;
                    });
                }).map(publisher -> {
                    if (targetProtocolEnhancer.get() != null) {
                        return publisher.map((Function)targetProtocolEnhancer.get());
                    }
                    return publisher;
                }).toArray(Publisher[]::new);
                Object functionToInvoke = function;
                if (!CollectionUtils.isEmpty(outputBindingNames)) {
                    ProducerProperties producerProperties;
                    BindingProperties bindingProperties = this.serviceProperties.getBindings().get(outputBindingNames.iterator().next());
                    ProducerProperties producerProperties2 = producerProperties = bindingProperties == null ? null : bindingProperties.getProducer();
                    if (producerProperties != null) {
                        function.setSkipOutputConversion(producerProperties.isUseNativeEncoding());
                    }
                    functionToInvoke = new PartitionAwareFunctionWrapper((Function<?, ?>)function, (ConfigurableApplicationContext)this.applicationContext, producerProperties);
                    if (outputBindingNames.size() > 1) {
                        ((PartitionAwareFunctionWrapper)functionToInvoke).setMessageEnricherEnabled(false);
                    }
                }
                if (!((resultPublishers = functionToInvoke.apply(inputPublishers.length == 1 ? inputPublishers[0] : Tuples.fromArray((Object[])inputPublishers))) instanceof Iterable)) {
                    resultPublishers = Collections.singletonList(resultPublishers);
                }
                Iterator<String> outputBindingIter = outputBindingNames.iterator();
                long outputCount = StreamSupport.stream(((Iterable)resultPublishers).spliterator(), false).count();
                ((Iterable)resultPublishers).forEach(publisher -> {
                    Flux flux = Flux.from((Publisher)((Publisher)publisher));
                    if (!CollectionUtils.isEmpty((Collection)outputBindingNames)) {
                        String outputBinding = (String)outputBindingIter.next();
                        MessageChannel outputChannel = (MessageChannel)this.applicationContext.getBean(outputBinding, MessageChannel.class);
                        flux = flux.doOnNext(message -> {
                            Message m;
                            if (outputCount > 1L) {
                                Integer partitionId = this.determinePartitionForOutputBinding(outputBinding, message);
                                message = ((MessageBuilder)MessageBuilder.fromMessage((Message)((Message)message)).setHeader("scst_partition", (Object)partitionId)).build();
                            }
                            if (message instanceof Message && (m = (Message)message).getHeaders().get((Object)"spring.cloud.stream.sendto.destination") != null) {
                                String destinationName = (String)m.getHeaders().get((Object)"spring.cloud.stream.sendto.destination");
                                ProducerProperties producerProperties = this.serviceProperties.getBindings().get(outputBindingNames.iterator().next()).getProducer();
                                MessageChannel dynamicChannel = this.streamBridge.resolveDestination(destinationName, producerProperties, null);
                                if (this.logger.isInfoEnabled()) {
                                    this.logger.info((Object)("Output message is sent to '" + destinationName + "' destination"));
                                }
                                dynamicChannel.send(m);
                            } else {
                                if (!(message instanceof Message)) {
                                    message = MessageBuilder.withPayload((Object)message).build();
                                }
                                if (isContextPropagationPresent && outputChannel instanceof FluxMessageChannel) {
                                    ContextView reactorContext = StaticMessageHeaderAccessor.getReactorContext((Message)((Message)message));
                                    try (AutoCloseable autoCloseable = ContextSnapshotHelper.setContext(reactorContext);){
                                        outputChannel.send((Message)message);
                                    }
                                    catch (Exception exception) {}
                                } else {
                                    outputChannel.send((Message)message);
                                }
                            }
                        }).doOnError(e -> {
                            this.logger.error((Object)("Failure was detected during execution of the reactive function '" + functionDefinition + "'"));
                            ((Throwable)e).printStackTrace();
                        });
                    }
                    if (!function.isConsumer()) {
                        flux.subscribe();
                    }
                });
            } else {
                String inputDestinationName;
                Object inputDestination;
                String outputDestinationName = this.determineOutputDestinationName(0, bindableProxyFactory, function.isConsumer());
                if (!ObjectUtils.isEmpty(inputBindingNames) && (inputDestination = this.applicationContext.getBean(inputDestinationName = inputBindingNames.iterator().next())) != null && inputDestination instanceof SubscribableChannel) {
                    AbstractMessageHandler handler = this.createFunctionHandler(function, inputDestinationName, outputDestinationName);
                    ((SubscribableChannel)inputDestination).subscribe((MessageHandler)handler);
                }
            }
        }

        private Integer determinePartitionForOutputBinding(String outputBinding, Object message) {
            ProducerProperties producerProperties;
            BindingProperties bindingProperties = this.serviceProperties.getBindings().get(outputBinding);
            ProducerProperties producerProperties2 = producerProperties = bindingProperties == null ? null : bindingProperties.getProducer();
            if (producerProperties != null && producerProperties.isPartitioned()) {
                StandardEvaluationContext evaluationContext = ExpressionUtils.createStandardEvaluationContext((BeanFactory)this.applicationContext.getBeanFactory());
                PartitionHandler partitionHandler = new PartitionHandler((EvaluationContext)evaluationContext, producerProperties, this.applicationContext.getBeanFactory());
                if (message instanceof Message) {
                    return partitionHandler.determinePartition((Message)message);
                }
            }
            return null;
        }

        private AbstractMessageHandler createFunctionHandler(final SimpleFunctionRegistry.FunctionInvocationWrapper function, String inputChannelName, final String outputChannelName) {
            ConsumerProperties consumerProperties = StringUtils.hasText((String)inputChannelName) ? this.serviceProperties.getBindingProperties(inputChannelName).getConsumer() : null;
            final ProducerProperties producerProperties = StringUtils.hasText((String)outputChannelName) ? this.serviceProperties.getBindingProperties(outputChannelName).getProducer() : null;
            final FunctionWrapper functionInvocationWrapper = new FunctionWrapper((Function)function, consumerProperties, producerProperties, (ConfigurableApplicationContext)this.applicationContext, this.determineTargetProtocol(outputChannelName));
            final MessagingTemplate template = new MessagingTemplate();
            template.setBeanFactory((BeanFactory)this.applicationContext.getBeanFactory());
            AbstractMessageHandler handler = new AbstractMessageHandler(){

                public void handleMessageInternal(Message<?> message) throws MessagingException {
                    Object result = functionInvocationWrapper.apply(message);
                    if (result == null) {
                        this.logger.debug((CharSequence)"Function execution resulted in null. No message will be sent");
                        return;
                    }
                    if (result instanceof Iterable) {
                        Iterable iterableResult = (Iterable)result;
                        for (Object resultElement : iterableResult) {
                            this.doSendMessage(resultElement, message);
                        }
                    } else if (ObjectUtils.isArray((Object)result) && !(result instanceof byte[])) {
                        for (int i = 0; i < ((Object[])result).length; ++i) {
                            this.doSendMessage(((Object[])result)[i], message);
                        }
                    } else {
                        this.doSendMessage(result, message);
                    }
                }

                private void doSendMessage(Object result, Message<?> requestMessage) {
                    Message messageResult;
                    if (result instanceof Message && (messageResult = (Message)result).getHeaders().get((Object)"spring.cloud.stream.sendto.destination") != null) {
                        String destinationName = (String)messageResult.getHeaders().get((Object)"spring.cloud.stream.sendto.destination");
                        MessageChannel outputChannel = streamBridge.resolveDestination(destinationName, producerProperties, null);
                        BindingProperties bindingProperties = serviceProperties.getBindingProperties(destinationName);
                        ProducerProperties sendToBindingProducerProperties = bindingProperties.getProducer();
                        if (sendToBindingProducerProperties != null && sendToBindingProducerProperties.isPartitioned()) {
                            ((AbstractMessageChannel)outputChannel).addInterceptor((ChannelInterceptor)new DefaultPartitioningInterceptor(bindingProperties, applicationContext.getBeanFactory()));
                        }
                        if (this.logger.isInfoEnabled()) {
                            this.logger.info((CharSequence)("Output message is sent to '" + destinationName + "' destination"));
                        }
                        outputChannel.send(messageResult);
                    } else if (StringUtils.hasText((String)outputChannelName)) {
                        if (!(result instanceof Message)) {
                            result = ((MessageBuilder)MessageBuilder.withPayload((Object)result).copyHeadersIfAbsent((Map)requestMessage.getHeaders())).build();
                        }
                        template.send(outputChannelName, (Message)result);
                    } else if (function.isRoutingFunction()) {
                        if (!(result instanceof Message)) {
                            result = ((MessageBuilder)MessageBuilder.withPayload((Object)result).copyHeadersIfAbsent((Map)requestMessage.getHeaders())).build();
                        }
                        streamBridge.send(function.getFunctionDefinition() + "-out-0", result);
                    }
                    function.postProcess();
                }
            };
            handler.setBeanFactory((BeanFactory)this.applicationContext);
            handler.afterPropertiesSet();
            return handler;
        }

        private String determineTargetProtocol(String outputBindingName) {
            if (StringUtils.hasText((String)outputBindingName)) {
                String binderConfigurationName = this.serviceProperties.getBinder(outputBindingName);
                BinderFactory binderFactory = (BinderFactory)this.applicationContext.getBean(BinderFactory.class);
                Binder<MessageChannel, ConsumerProperties, ProducerProperties> binder = binderFactory.getBinder(binderConfigurationName, MessageChannel.class);
                return binder.getClass().getSimpleName().startsWith("Rabbit") ? "amqp" : "kafka";
            }
            return null;
        }

        private boolean isReactiveOrMultipleInputOutput(BindableProxyFactory bindableProxyFactory, Type inputType, Type outputType) {
            boolean reactiveInputsOutputs = FunctionTypeUtils.isPublisher((Type)inputType) || FunctionTypeUtils.isPublisher((Type)outputType);
            return this.isMultipleInputOutput(bindableProxyFactory) || reactiveInputsOutputs;
        }

        private String determineOutputDestinationName(int index, BindableProxyFactory bindableProxyFactory, boolean isConsumer) {
            List<String> outputNames = new ArrayList<String>(bindableProxyFactory.getOutputs());
            if (CollectionUtils.isEmpty(outputNames)) {
                outputNames = Collections.singletonList("output");
            }
            return bindableProxyFactory instanceof BindableFunctionProxyFactory ? ((BindableFunctionProxyFactory)bindableProxyFactory).getOutputName(index) : (isConsumer ? null : outputNames.get(index));
        }

        private void assertBindingIsPossible(BindableProxyFactory bindableProxyFactory) {
            if (this.isMultipleInputOutput(bindableProxyFactory)) {
                Assert.isTrue((!this.functionProperties.isComposeTo() && !this.functionProperties.isComposeFrom() ? 1 : 0) != 0, (String)"Composing to/from existing Sinks and Sources are not supported for functions with multiple arguments.");
            }
        }

        private boolean isMultipleInputOutput(BindableProxyFactory bindableProxyFactory) {
            return bindableProxyFactory instanceof BindableFunctionProxyFactory && ((BindableFunctionProxyFactory)bindableProxyFactory).isMultiple();
        }

        private boolean isArray(Type type) {
            return type instanceof GenericArrayType || type instanceof Class && ((Class)type).isArray();
        }

        private void assertSupportedSignatures(BindableProxyFactory bindableProxyFactory, SimpleFunctionRegistry.FunctionInvocationWrapper function) {
            if (this.isMultipleInputOutput(bindableProxyFactory)) {
                Assert.isTrue((!function.isConsumer() ? 1 : 0) != 0, (String)("Function '" + this.functionProperties.getDefinition() + "' is a Consumer which is not supported for multi-in/out reactive streams. Only Functions are supported"));
                Assert.isTrue((!function.isSupplier() ? 1 : 0) != 0, (String)("Function '" + this.functionProperties.getDefinition() + "' is a Supplier which is not supported for multi-in/out reactive streams. Only Functions are supported"));
                Assert.isTrue((!this.isArray(function.getInputType()) && !this.isArray(function.getOutputType()) ? 1 : 0) != 0, (String)("Function '" + this.functionProperties.getDefinition() + "' has the following signature: [" + function + "]. Your input and/or outout lacks arity and therefore we can not determine how many input/output destinations are required in the context of function input/output binding."));
            }
        }
    }

    private static final class ContextSnapshotHelper {
        private static final ContextSnapshotFactory CONTEXT_SNAPSHOT_FACTORY = ContextSnapshotFactory.builder().build();

        private ContextSnapshotHelper() {
        }

        static AutoCloseable setContext(ContextView context) {
            return CONTEXT_SNAPSHOT_FACTORY.setThreadLocalsFrom((Object)context, new String[0]);
        }
    }

    private static class FunctionWrapper
    implements Function<Message<byte[]>, Object> {
        private final Function function;
        private final ConsumerProperties consumerProperties;
        private final ProducerProperties producerProperties;
        private final Field headersField;
        private final ConfigurableApplicationContext applicationContext;
        private final boolean isRoutingFunction;
        private final String targetProtocol;

        FunctionWrapper(Function function, ConsumerProperties consumerProperties, ProducerProperties producerProperties, ConfigurableApplicationContext applicationContext, String targetProtocol) {
            this.isRoutingFunction = ((SimpleFunctionRegistry.FunctionInvocationWrapper)function).getTarget() instanceof RoutingFunction;
            this.applicationContext = applicationContext;
            this.function = new PartitionAwareFunctionWrapper(function, this.applicationContext, producerProperties);
            this.consumerProperties = consumerProperties;
            if (this.consumerProperties != null) {
                ((SimpleFunctionRegistry.FunctionInvocationWrapper)function).setSkipInputConversion(this.consumerProperties.isUseNativeDecoding());
            }
            this.producerProperties = producerProperties;
            if (this.producerProperties != null) {
                ((SimpleFunctionRegistry.FunctionInvocationWrapper)function).setSkipOutputConversion(this.producerProperties.isUseNativeEncoding());
            }
            this.headersField = ReflectionUtils.findField(MessageHeaders.class, (String)"headers");
            this.headersField.setAccessible(true);
            this.targetProtocol = targetProtocol;
        }

        @Override
        public Object apply(Message<byte[]> message) {
            message = FunctionConfiguration.sanitize(message);
            this.setHeadersIfNeeded(message);
            Object result = this.function.apply(message);
            if (result instanceof Publisher && this.isRoutingFunction) {
                throw new IllegalStateException("Routing to functions that return Publisher is not supported in the context of Spring Cloud Stream.");
            }
            if (result instanceof Message) {
                Message resultMessage = (Message)result;
                this.setHeadersIfNeeded(resultMessage);
            }
            return result;
        }

        private void setHeadersIfNeeded(Message message) {
            Map headersMap = (Map)ReflectionUtils.getField((Field)this.headersField, (Object)message.getHeaders());
            if (CloudEventMessageUtils.isCloudEvent((Message)message)) {
                headersMap.putIfAbsent(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE);
            }
        }
    }
}

