/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import java.lang.annotation.Annotation;
import java.lang.reflect.Executable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.AbstractKafkaStreamsBinderProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderUtils;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBindingInformationCatalogue;
import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsStateStore;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsStateStoreProperties;
import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
import org.springframework.cloud.stream.binding.StreamListenerSetupMethodOrchestrator;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

class KafkaStreamsStreamListenerSetupMethodOrchestrator
extends AbstractKafkaStreamsBinderProcessor
implements StreamListenerSetupMethodOrchestrator {
    private static final Log LOG = LogFactory.getLog(KafkaStreamsStreamListenerSetupMethodOrchestrator.class);
    private final StreamListenerParameterAdapter streamListenerParameterAdapter;
    private final Collection<StreamListenerResultAdapter> streamListenerResultAdapters;
    private final BindingServiceProperties bindingServiceProperties;
    private final KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
    private final KeyValueSerdeResolver keyValueSerdeResolver;
    private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
    private final Map<Method, List<String>> registeredStoresPerMethod = new HashMap<Method, List<String>>();
    private final Map<Method, StreamsBuilderFactoryBean> methodStreamsBuilderFactoryBeanMap = new HashMap<Method, StreamsBuilderFactoryBean>();
    StreamsBuilderFactoryBeanCustomizer customizer;
    private final ConfigurableEnvironment environment;

    KafkaStreamsStreamListenerSetupMethodOrchestrator(BindingServiceProperties bindingServiceProperties, KafkaStreamsExtendedBindingProperties extendedBindingProperties, KeyValueSerdeResolver keyValueSerdeResolver, KafkaStreamsBindingInformationCatalogue bindingInformationCatalogue, StreamListenerParameterAdapter streamListenerParameterAdapter, Collection<StreamListenerResultAdapter> listenerResultAdapters, CleanupConfig cleanupConfig, StreamsBuilderFactoryBeanCustomizer customizer, ConfigurableEnvironment environment) {
        super(bindingServiceProperties, bindingInformationCatalogue, extendedBindingProperties, keyValueSerdeResolver, cleanupConfig);
        this.bindingServiceProperties = bindingServiceProperties;
        this.kafkaStreamsExtendedBindingProperties = extendedBindingProperties;
        this.keyValueSerdeResolver = keyValueSerdeResolver;
        this.kafkaStreamsBindingInformationCatalogue = bindingInformationCatalogue;
        this.streamListenerParameterAdapter = streamListenerParameterAdapter;
        this.streamListenerResultAdapters = listenerResultAdapters;
        this.customizer = customizer;
        this.environment = environment;
    }

    public boolean supports(Method method) {
        return this.methodParameterSupports(method) && (this.methodReturnTypeSuppports(method) || Void.TYPE.equals(method.getReturnType()));
    }

    private boolean methodReturnTypeSuppports(Method method) {
        Class<?> returnType = method.getReturnType();
        return returnType.equals(KStream.class) || returnType.isArray() && returnType.getComponentType().equals(KStream.class);
    }

    private boolean methodParameterSupports(Method method) {
        boolean supports = false;
        for (int i = 0; i < method.getParameterCount(); ++i) {
            MethodParameter methodParameter = MethodParameter.forExecutable((Executable)method, (int)i);
            Class parameterType = methodParameter.getParameterType();
            if (!parameterType.equals(KStream.class) && !parameterType.equals(KTable.class) && !parameterType.equals(GlobalKTable.class)) continue;
            supports = true;
        }
        return supports;
    }

    public void orchestrateStreamListenerSetupMethod(StreamListener streamListener, Method method, Object bean) {
        String[] methodAnnotatedOutboundNames = KafkaStreamsStreamListenerSetupMethodOrchestrator.getOutboundBindingTargetNames(method);
        this.validateStreamListenerMethod(streamListener, method, methodAnnotatedOutboundNames);
        String methodAnnotatedInboundName = streamListener.value();
        Object[] adaptedInboundArguments = this.adaptAndRetrieveInboundArguments(method, methodAnnotatedInboundName, (ApplicationContext)this.applicationContext, this.streamListenerParameterAdapter);
        try {
            ReflectionUtils.makeAccessible((Method)method);
            if (Void.TYPE.equals(method.getReturnType())) {
                method.invoke(bean, adaptedInboundArguments);
            } else {
                Object result = method.invoke(bean, adaptedInboundArguments);
                if (methodAnnotatedOutboundNames != null && methodAnnotatedOutboundNames.length > 0) {
                    if (result.getClass().isArray()) {
                        Assert.isTrue((methodAnnotatedOutboundNames.length == ((Object[])result).length ? 1 : 0) != 0, (String)"Result does not match with the number of declared outbounds");
                    } else {
                        Assert.isTrue((methodAnnotatedOutboundNames.length == 1 ? 1 : 0) != 0, (String)"Result does not match with the number of declared outbounds");
                    }
                }
                if (methodAnnotatedOutboundNames != null && methodAnnotatedOutboundNames.length > 0) {
                    methodAnnotatedInboundName = this.populateInboundIfMissing(method, methodAnnotatedInboundName);
                    StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeanPerBinding().get(methodAnnotatedInboundName);
                    if (result.getClass().isArray()) {
                        Object[] outboundKStreams = (Object[])result;
                        int i = 0;
                        for (Object outboundKStream : outboundKStreams) {
                            String methodAnnotatedOutboundName = methodAnnotatedOutboundNames[i++];
                            this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(methodAnnotatedOutboundName, streamsBuilderFactoryBean);
                            Object targetBean = this.applicationContext.getBean(methodAnnotatedOutboundName);
                            this.kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(targetBean, ResolvableType.forMethodReturnType((Method)method));
                            this.adaptStreamListenerResult(outboundKStream, targetBean);
                        }
                    } else {
                        this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(methodAnnotatedOutboundNames[0], streamsBuilderFactoryBean);
                        Object targetBean = this.applicationContext.getBean(methodAnnotatedOutboundNames[0]);
                        this.kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(targetBean, ResolvableType.forMethodReturnType((Method)method));
                        this.adaptStreamListenerResult(result, targetBean);
                    }
                }
            }
        }
        catch (Exception ex) {
            throw new BeanInitializationException("Cannot setup StreamListener for " + method, (Throwable)ex);
        }
    }

    private String populateInboundIfMissing(Method method, String methodAnnotatedInboundName) {
        MethodParameter methodParameter;
        Object[] arguments;
        if (!StringUtils.hasText((String)methodAnnotatedInboundName) && (arguments = new Object[method.getParameterTypes().length]).length > 0 && (methodParameter = MethodParameter.forExecutable((Executable)method, (int)0)).hasParameterAnnotation(Input.class)) {
            Input methodAnnotation = (Input)methodParameter.getParameterAnnotation(Input.class);
            methodAnnotatedInboundName = methodAnnotation.value();
        }
        return methodAnnotatedInboundName;
    }

    private void adaptStreamListenerResult(Object outboundKStream, Object targetBean) {
        for (StreamListenerResultAdapter streamListenerResultAdapter : this.streamListenerResultAdapters) {
            if (!streamListenerResultAdapter.supports(outboundKStream.getClass(), targetBean.getClass())) continue;
            streamListenerResultAdapter.adapt(outboundKStream, targetBean);
            break;
        }
    }

    public Object[] adaptAndRetrieveInboundArguments(Method method, String inboundName, ApplicationContext applicationContext, StreamListenerParameterAdapter ... adapters) {
        Object[] arguments = new Object[method.getParameterTypes().length];
        for (int parameterIndex = 0; parameterIndex < arguments.length; ++parameterIndex) {
            MethodParameter methodParameter = MethodParameter.forExecutable((Executable)method, (int)parameterIndex);
            Class parameterType = methodParameter.getParameterType();
            Object targetReferenceValue = null;
            if (methodParameter.hasParameterAnnotation(Input.class)) {
                targetReferenceValue = AnnotationUtils.getValue((Annotation)methodParameter.getParameterAnnotation(Input.class));
                Input methodAnnotation = (Input)methodParameter.getParameterAnnotation(Input.class);
                inboundName = methodAnnotation.value();
            } else if (arguments.length == 1 && StringUtils.hasText((String)inboundName)) {
                targetReferenceValue = inboundName;
            }
            if (targetReferenceValue != null) {
                StreamsBuilderFactoryBean streamsBuilderFactoryBean;
                Assert.isInstanceOf(String.class, (Object)targetReferenceValue, (String)"Annotation value must be a String");
                Object targetBean = applicationContext.getBean((String)targetReferenceValue);
                BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(inboundName);
                if (!this.methodStreamsBuilderFactoryBeanMap.containsKey(method)) {
                    streamsBuilderFactoryBean = this.buildStreamsBuilderAndRetrieveConfig(method.getDeclaringClass().getSimpleName() + "-" + method.getName(), applicationContext, inboundName, null, this.customizer, this.environment, bindingProperties);
                    this.methodStreamsBuilderFactoryBeanMap.put(method, streamsBuilderFactoryBean);
                }
                try {
                    streamsBuilderFactoryBean = this.methodStreamsBuilderFactoryBeanMap.get(method);
                    StreamsBuilder streamsBuilder = (StreamsBuilder)streamsBuilderFactoryBean.getObject();
                    String applicationId = streamsBuilderFactoryBean.getStreamsConfiguration().getProperty("application.id");
                    KafkaStreamsConsumerProperties extendedConsumerProperties = (KafkaStreamsConsumerProperties)((Object)this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(inboundName));
                    extendedConsumerProperties.setApplicationId(applicationId);
                    KafkaStreamsStateStoreProperties spec = this.buildStateStoreSpec(method);
                    Serde<?> keySerde = this.keyValueSerdeResolver.getInboundKeySerde(extendedConsumerProperties, ResolvableType.forMethodParameter((MethodParameter)methodParameter));
                    LOG.info((Object)("Key Serde used for " + targetReferenceValue + ": " + keySerde.getClass().getName()));
                    Serde<?> valueSerde = this.bindingServiceProperties.getConsumerProperties(inboundName).isUseNativeDecoding() ? this.getValueSerde(inboundName, extendedConsumerProperties, ResolvableType.forMethodParameter((MethodParameter)methodParameter)) : Serdes.ByteArray();
                    LOG.info((Object)("Value Serde used for " + targetReferenceValue + ": " + valueSerde.getClass().getName()));
                    Topology.AutoOffsetReset autoOffsetReset = this.getAutoOffsetReset(inboundName, extendedConsumerProperties);
                    if (parameterType.isAssignableFrom(KStream.class)) {
                        KStream<?, ?> stream = this.getkStream(inboundName, spec, bindingProperties, extendedConsumerProperties, streamsBuilder, keySerde, valueSerde, autoOffsetReset, parameterIndex == 0);
                        KStreamBoundElementFactory.KStreamWrapper kStreamWrapper = (KStreamBoundElementFactory.KStreamWrapper)targetBean;
                        kStreamWrapper.wrap(stream);
                        this.kafkaStreamsBindingInformationCatalogue.addKeySerde(stream, keySerde);
                        BindingProperties bindingProperties1 = this.kafkaStreamsBindingInformationCatalogue.getBindingProperties().get(kStreamWrapper);
                        this.kafkaStreamsBindingInformationCatalogue.registerBindingProperties(stream, bindingProperties1);
                        this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(inboundName, streamsBuilderFactoryBean);
                        for (StreamListenerParameterAdapter streamListenerParameterAdapter : adapters) {
                            if (!streamListenerParameterAdapter.supports(stream.getClass(), methodParameter)) continue;
                            arguments[parameterIndex] = streamListenerParameterAdapter.adapt(stream, methodParameter);
                            break;
                        }
                        if (arguments[parameterIndex] == null && parameterType.isAssignableFrom(stream.getClass())) {
                            arguments[parameterIndex] = stream;
                        }
                        Assert.notNull((Object)arguments[parameterIndex], (String)("Cannot convert argument " + parameterIndex + " of " + method + "from " + stream.getClass() + " to " + parameterType));
                        continue;
                    }
                    this.handleKTableGlobalKTableInputs(arguments, parameterIndex, inboundName, parameterType, targetBean, streamsBuilderFactoryBean, streamsBuilder, extendedConsumerProperties, keySerde, valueSerde, autoOffsetReset, parameterIndex == 0);
                    continue;
                }
                catch (Exception ex) {
                    throw new IllegalStateException(ex);
                }
            }
            throw new IllegalStateException("A method annotated with @StreamListener may use @Input or @Output annotations only in declarative mode and for parameters that are binding targets or convertible from binding targets.");
        }
        return arguments;
    }

    private StoreBuilder buildStateStore(KafkaStreamsStateStoreProperties spec) {
        try {
            StoreBuilder builder;
            Serde<?> keySerde = this.keyValueSerdeResolver.getStateStoreKeySerde(spec.getKeySerdeString());
            Serde<?> valueSerde = this.keyValueSerdeResolver.getStateStoreValueSerde(spec.getValueSerdeString());
            switch (spec.getType()) {
                case KEYVALUE: {
                    builder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)spec.getName()), keySerde, valueSerde);
                    break;
                }
                case WINDOW: {
                    builder = Stores.windowStoreBuilder((WindowBytesStoreSupplier)Stores.persistentWindowStore((String)spec.getName(), (long)spec.getRetention(), (int)3, (long)spec.getLength(), (boolean)false), keySerde, valueSerde);
                    break;
                }
                case SESSION: {
                    builder = Stores.sessionStoreBuilder((SessionBytesStoreSupplier)Stores.persistentSessionStore((String)spec.getName(), (long)spec.getRetention()), keySerde, valueSerde);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("state store type (" + (Object)((Object)spec.getType()) + ") is not supported!");
                }
            }
            if (spec.isCacheEnabled()) {
                builder = builder.withCachingEnabled();
            }
            if (spec.isLoggingDisabled()) {
                builder = builder.withLoggingDisabled();
            }
            return builder;
        }
        catch (Exception ex) {
            LOG.error((Object)("failed to build state store exception : " + ex));
            throw ex;
        }
    }

    private KStream<?, ?> getkStream(String inboundName, KafkaStreamsStateStoreProperties storeSpec, BindingProperties bindingProperties, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, StreamsBuilder streamsBuilder, Serde<?> keySerde, Serde<?> valueSerde, Topology.AutoOffsetReset autoOffsetReset, boolean firstBuild) {
        if (storeSpec != null) {
            StoreBuilder storeBuilder = this.buildStateStore(storeSpec);
            streamsBuilder.addStateStore(storeBuilder);
            if (LOG.isInfoEnabled()) {
                LOG.info((Object)("state store " + storeBuilder.name() + " added to topology"));
            }
        }
        return this.getKStream(inboundName, bindingProperties, kafkaStreamsConsumerProperties, streamsBuilder, keySerde, valueSerde, autoOffsetReset, firstBuild);
    }

    private void validateStreamListenerMethod(StreamListener streamListener, Method method, String[] methodAnnotatedOutboundNames) {
        String methodAnnotatedInboundName = streamListener.value();
        if (methodAnnotatedOutboundNames != null) {
            for (String s : methodAnnotatedOutboundNames) {
                if (!StringUtils.hasText((String)s)) continue;
                Assert.isTrue((boolean)this.isDeclarativeOutput(method, s), (String)"Method must be declarative");
            }
        }
        if (StringUtils.hasText((String)methodAnnotatedInboundName)) {
            int methodArgumentsLength = method.getParameterTypes().length;
            for (int parameterIndex = 0; parameterIndex < methodArgumentsLength; ++parameterIndex) {
                MethodParameter methodParameter = MethodParameter.forExecutable((Executable)method, (int)parameterIndex);
                Assert.isTrue((boolean)this.isDeclarativeInput(methodAnnotatedInboundName, methodParameter), (String)"Method must be declarative");
            }
        }
    }

    private boolean isDeclarativeOutput(Method m, String targetBeanName) {
        Class<?> returnType = m.getReturnType();
        if (returnType.isArray()) {
            Class targetBeanClass = this.applicationContext.getType(targetBeanName);
            boolean declarative = this.streamListenerResultAdapters.stream().anyMatch(slpa -> slpa.supports(returnType.getComponentType(), targetBeanClass));
            return declarative;
        }
        Class targetBeanClass = this.applicationContext.getType(targetBeanName);
        boolean declarative = this.streamListenerResultAdapters.stream().anyMatch(slpa -> slpa.supports(returnType, targetBeanClass));
        return declarative;
    }

    private boolean isDeclarativeInput(String targetBeanName, MethodParameter methodParameter) {
        Class targetBeanClass;
        if (!methodParameter.getParameterType().isAssignableFrom(Object.class) && this.applicationContext.containsBean(targetBeanName) && (targetBeanClass = this.applicationContext.getType(targetBeanName)) != null) {
            boolean supports = KafkaStreamsBinderUtils.supportsKStream(methodParameter, targetBeanClass);
            if (!supports) {
                boolean bl = supports = KTable.class.isAssignableFrom(targetBeanClass) && KTable.class.isAssignableFrom(methodParameter.getParameterType());
                if (!supports) {
                    supports = GlobalKTable.class.isAssignableFrom(targetBeanClass) && GlobalKTable.class.isAssignableFrom(methodParameter.getParameterType());
                }
            }
            return supports;
        }
        return false;
    }

    private static String[] getOutboundBindingTargetNames(Method method) {
        SendTo sendTo = (SendTo)AnnotationUtils.findAnnotation((Method)method, SendTo.class);
        if (sendTo != null) {
            Assert.isTrue((!ObjectUtils.isEmpty((Object[])sendTo.value()) ? 1 : 0) != 0, (String)"At least one output must be specified");
            Assert.isTrue((sendTo.value().length >= 1 ? 1 : 0) != 0, (String)"At least one outbound destination need to be provided.");
            return sendTo.value();
        }
        return null;
    }

    private KafkaStreamsStateStoreProperties buildStateStoreSpec(Method method) {
        KafkaStreamsStateStore spec;
        if (!this.registeredStoresPerMethod.containsKey(method) && (spec = (KafkaStreamsStateStore)AnnotationUtils.findAnnotation((Method)method, KafkaStreamsStateStore.class)) != null) {
            Assert.isTrue((!ObjectUtils.isEmpty((Object)spec.name()) ? 1 : 0) != 0, (String)"name cannot be empty");
            Assert.isTrue((spec.name().length() >= 1 ? 1 : 0) != 0, (String)"name cannot be empty.");
            this.registeredStoresPerMethod.put(method, new ArrayList());
            this.registeredStoresPerMethod.get(method).add(spec.name());
            KafkaStreamsStateStoreProperties props = new KafkaStreamsStateStoreProperties();
            props.setName(spec.name());
            props.setType(spec.type());
            props.setLength(spec.lengthMs());
            props.setKeySerdeString(spec.keySerde());
            props.setRetention(spec.retentionMs());
            props.setValueSerdeString(spec.valueSerde());
            props.setCacheEnabled(spec.cache());
            props.setLoggingDisabled(!spec.logging());
            return props;
        }
        return null;
    }
}

