/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.pulsar;

import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.schema.SchemaType;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.pulsar.PulsarBinderHeaderMapper;
import org.springframework.cloud.stream.binder.pulsar.PulsarBinderUtils;
import org.springframework.cloud.stream.binder.pulsar.properties.ConsumerConfigProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.ProducerConfigProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarConsumerProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarExtendedBindingProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarProducerProperties;
import org.springframework.cloud.stream.binder.pulsar.provisioning.PulsarTopicProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.support.management.ManageableLifecycle;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.pulsar.core.ProducerBuilderCustomizer;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TypedMessageBuilderCustomizer;
import org.springframework.pulsar.listener.AbstractPulsarMessageListenerContainer;
import org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.listener.PulsarRecordMessageListener;
import org.springframework.pulsar.support.header.PulsarHeaderMapper;

public class PulsarMessageChannelBinder
extends AbstractMessageChannelBinder<ExtendedConsumerProperties<PulsarConsumerProperties>, ExtendedProducerProperties<PulsarProducerProperties>, PulsarTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, PulsarConsumerProperties, PulsarProducerProperties> {
    private final PulsarTemplate<Object> pulsarTemplate;
    private final PulsarConsumerFactory<?> pulsarConsumerFactory;
    private final PulsarBinderConfigurationProperties binderConfigProps;
    private final SchemaResolver schemaResolver;
    private final PulsarHeaderMapper headerMapper;
    private PulsarExtendedBindingProperties extendedBindingProperties = new PulsarExtendedBindingProperties();

    public PulsarMessageChannelBinder(PulsarTopicProvisioner provisioningProvider, PulsarTemplate<Object> pulsarTemplate, PulsarConsumerFactory<?> pulsarConsumerFactory, PulsarBinderConfigurationProperties binderConfigProps, SchemaResolver schemaResolver, PulsarHeaderMapper headerMapper) {
        super(null, (ProvisioningProvider)provisioningProvider);
        this.pulsarTemplate = pulsarTemplate;
        this.pulsarConsumerFactory = pulsarConsumerFactory;
        this.binderConfigProps = binderConfigProps;
        this.schemaResolver = schemaResolver;
        this.headerMapper = headerMapper;
    }

    protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<PulsarProducerProperties> producerProperties, MessageChannel errorChannel) {
        Schema schema;
        if (producerProperties.isUseNativeEncoding()) {
            SchemaType schemaType = Optional.ofNullable(((PulsarProducerProperties)((Object)producerProperties.getExtension())).getSchemaType()).orElse(SchemaType.NONE);
            schema = (Schema)this.schemaResolver.resolveSchema(schemaType, ((PulsarProducerProperties)((Object)producerProperties.getExtension())).getMessageType(), ((PulsarProducerProperties)((Object)producerProperties.getExtension())).getMessageKeyType()).orElseThrow(() -> "Could not determine producer schema for " + destination.getName());
        } else {
            schema = null;
        }
        Map<String, Object> layeredProducerProps = PulsarBinderUtils.mergeModifiedProducerProperties(this.binderConfigProps.getProducer(), (ProducerConfigProperties)((Object)producerProperties.getExtension()));
        PulsarProducerConfigurationMessageHandler handler = new PulsarProducerConfigurationMessageHandler(this.pulsarTemplate, schema, destination.getName(), (ProducerBuilderCustomizer<Object>)((ProducerBuilderCustomizer)builder -> PulsarBinderUtils.loadConf(builder, layeredProducerProps)), this.determineOutboundHeaderMapper(producerProperties));
        handler.setApplicationContext((ApplicationContext)this.getApplicationContext());
        handler.setBeanFactory((BeanFactory)this.getBeanFactory());
        return handler;
    }

    @Nullable
    private PulsarBinderHeaderMapper determineOutboundHeaderMapper(ExtendedProducerProperties<PulsarProducerProperties> extProducerProps) {
        if (HeaderMode.none.equals((Object)extProducerProps.getHeaderMode())) {
            return null;
        }
        return new PulsarBinderHeaderMapper(this.headerMapper);
    }

    protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<PulsarConsumerProperties> properties) {
        PulsarContainerProperties containerProperties = new PulsarContainerProperties(new String[0]);
        containerProperties.setTopics(Set.of(destination.getName()));
        PulsarBinderHeaderMapper inboundHeaderMapper = this.determineInboundHeaderMapper(properties);
        PulsarMessageDrivenChannelAdapter messageDrivenChannelAdapter = new PulsarMessageDrivenChannelAdapter();
        containerProperties.setMessageListener((PulsarRecordMessageListener & Serializable)(consumer, pulsarMsg) -> {
            Message springMessage = inboundHeaderMapper != null ? MessageBuilder.createMessage((Object)pulsarMsg.getValue(), (MessageHeaders)inboundHeaderMapper.toSpringHeaders(pulsarMsg)) : MessageBuilder.withPayload((Object)pulsarMsg.getValue()).build();
            messageDrivenChannelAdapter.send(springMessage);
        });
        if (properties.isUseNativeDecoding()) {
            SchemaType schemaType = Optional.ofNullable(((PulsarConsumerProperties)((Object)properties.getExtension())).getSchemaType()).orElse(SchemaType.NONE);
            Schema schema = (Schema)this.schemaResolver.resolveSchema(schemaType, ((PulsarConsumerProperties)((Object)properties.getExtension())).getMessageType(), ((PulsarConsumerProperties)((Object)properties.getExtension())).getMessageKeyType()).orElseThrow(() -> "Could not determine consumer schema for " + destination.getName());
            containerProperties.setSchema(schema);
        } else {
            containerProperties.setSchema(Schema.BYTES);
        }
        String subscriptionName = PulsarBinderUtils.subscriptionName((PulsarConsumerProperties)((Object)properties.getExtension()), destination);
        containerProperties.setSubscriptionName(subscriptionName);
        Map<String, Object> layeredConsumerProps = PulsarBinderUtils.mergeModifiedConsumerProperties(this.binderConfigProps.getConsumer(), (ConsumerConfigProperties)((Object)properties.getExtension()));
        containerProperties.getPulsarConsumerProperties().putAll(layeredConsumerProps);
        containerProperties.updateContainerProperties();
        DefaultPulsarMessageListenerContainer container = new DefaultPulsarMessageListenerContainer(this.pulsarConsumerFactory, containerProperties);
        messageDrivenChannelAdapter.setMessageListenerContainer((AbstractPulsarMessageListenerContainer<?>)container);
        messageDrivenChannelAdapter.setApplicationContext((ApplicationContext)this.getApplicationContext());
        messageDrivenChannelAdapter.setBeanFactory((BeanFactory)this.getApplicationContext().getBeanFactory());
        return messageDrivenChannelAdapter;
    }

    @Nullable
    private PulsarBinderHeaderMapper determineInboundHeaderMapper(ExtendedConsumerProperties<PulsarConsumerProperties> extConsumerProps) {
        if (HeaderMode.none.equals((Object)extConsumerProps.getHeaderMode())) {
            return null;
        }
        return new PulsarBinderHeaderMapper(this.headerMapper);
    }

    public PulsarConsumerProperties getExtendedConsumerProperties(String channelName) {
        return (PulsarConsumerProperties)((Object)this.extendedBindingProperties.getExtendedConsumerProperties(channelName));
    }

    public PulsarProducerProperties getExtendedProducerProperties(String channelName) {
        return (PulsarProducerProperties)((Object)this.extendedBindingProperties.getExtendedProducerProperties(channelName));
    }

    public String getDefaultsPrefix() {
        return null;
    }

    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return null;
    }

    public PulsarExtendedBindingProperties getExtendedBindingProperties() {
        return this.extendedBindingProperties;
    }

    public void setExtendedBindingProperties(PulsarExtendedBindingProperties extendedBindingProperties) {
        this.extendedBindingProperties = extendedBindingProperties;
    }

    static class PulsarProducerConfigurationMessageHandler
    extends AbstractMessageProducingHandler
    implements ManageableLifecycle {
        private final PulsarTemplate<Object> pulsarTemplate;
        private final Schema<Object> schema;
        private final String destination;
        private final ProducerBuilderCustomizer<Object> layeredProducerPropsCustomizer;
        private final PulsarHeaderMapper headerMapper;
        private boolean running = true;

        PulsarProducerConfigurationMessageHandler(PulsarTemplate<Object> pulsarTemplate, Schema<Object> schema, String destination, ProducerBuilderCustomizer<Object> layeredProducerPropsCustomizer, PulsarHeaderMapper headerMapper) {
            this.pulsarTemplate = pulsarTemplate;
            this.schema = schema;
            this.destination = destination;
            this.layeredProducerPropsCustomizer = layeredProducerPropsCustomizer;
            this.headerMapper = headerMapper;
        }

        public void start() {
            try {
                super.onInit();
            }
            catch (Exception ex) {
                this.logger.error((Throwable)ex, (CharSequence)"Initialization errors: ");
                throw new RuntimeException(ex);
            }
        }

        public void stop() {
            this.running = false;
        }

        public boolean isRunning() {
            return this.running;
        }

        protected void handleMessageInternal(Message<?> message) {
            try {
                this.pulsarTemplate.newMessage(message.getPayload()).withTopic(this.destination).withSchema(this.schema).withProducerCustomizer(this.layeredProducerPropsCustomizer).withMessageCustomizer(this.applySpringHeadersAsPulsarProperties(message.getHeaders())).sendAsync();
            }
            catch (Exception ex) {
                this.logger.trace((Throwable)ex, (CharSequence)("Failed to send message to destination: " + this.destination));
            }
        }

        private TypedMessageBuilderCustomizer<Object> applySpringHeadersAsPulsarProperties(MessageHeaders headers) {
            return mb -> {
                if (this.headerMapper != null) {
                    this.headerMapper.toPulsarHeaders(headers).forEach((arg_0, arg_1) -> ((TypedMessageBuilder)mb).property(arg_0, arg_1));
                }
            };
        }
    }

    static class PulsarMessageDrivenChannelAdapter
    extends MessageProducerSupport {
        AbstractPulsarMessageListenerContainer<?> messageListenerContainer;

        PulsarMessageDrivenChannelAdapter() {
        }

        public void send(Message<?> message) {
            this.sendMessage(message);
        }

        protected void doStart() {
            this.messageListenerContainer.start();
        }

        protected void doStop() {
            this.messageListenerContainer.stop();
        }

        public void setMessageListenerContainer(AbstractPulsarMessageListenerContainer<?> messageListenerContainer) {
            this.messageListenerContainer = messageListenerContainer;
        }
    }
}

