/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.pulsar.core;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.ProducerBuilderCustomizer;
import org.springframework.pulsar.core.ProducerUtils;
import org.springframework.pulsar.core.PulsarOperations;
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.core.TypedMessageBuilderCustomizer;
import org.springframework.pulsar.observation.DefaultPulsarTemplateObservationConvention;
import org.springframework.pulsar.observation.PulsarMessageSenderContext;
import org.springframework.pulsar.observation.PulsarTemplateObservation;
import org.springframework.pulsar.observation.PulsarTemplateObservationConvention;
import org.springframework.util.CollectionUtils;

public class PulsarTemplate<T>
implements PulsarOperations<T>,
ApplicationContextAware,
BeanNameAware,
SmartInitializingSingleton {
    private final LogAccessor logger = new LogAccessor(this.getClass());
    private final PulsarProducerFactory<T> producerFactory;
    private final List<ProducerInterceptor> interceptors;
    private final SchemaResolver schemaResolver;
    private final TopicResolver topicResolver;
    private boolean observationEnabled;
    @Nullable
    private ObservationRegistry observationRegistry;
    @Nullable
    private PulsarTemplateObservationConvention observationConvention;
    @Nullable
    private ApplicationContext applicationContext;
    private String beanName = "";

    public PulsarTemplate(PulsarProducerFactory<T> producerFactory) {
        this(producerFactory, Collections.emptyList());
    }

    public PulsarTemplate(PulsarProducerFactory<T> producerFactory, List<ProducerInterceptor> interceptors) {
        this(producerFactory, interceptors, new DefaultSchemaResolver(), new DefaultTopicResolver(), true);
    }

    public PulsarTemplate(PulsarProducerFactory<T> producerFactory, List<ProducerInterceptor> interceptors, SchemaResolver schemaResolver, TopicResolver topicResolver, boolean observationEnabled) {
        this.producerFactory = producerFactory;
        this.interceptors = interceptors;
        this.schemaResolver = schemaResolver;
        this.topicResolver = topicResolver;
        this.observationEnabled = observationEnabled;
    }

    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    public void afterSingletonsInstantiated() {
        if (!this.observationEnabled) {
            this.logger.debug(() -> "Observations are not enabled - not recording");
            return;
        }
        if (this.applicationContext == null) {
            this.logger.warn(() -> "Observations enabled but application context null - not recording");
            return;
        }
        this.observationRegistry = (ObservationRegistry)this.applicationContext.getBeanProvider(ObservationRegistry.class).getIfUnique(() -> this.observationRegistry);
        this.observationConvention = (PulsarTemplateObservationConvention)this.applicationContext.getBeanProvider(PulsarTemplateObservationConvention.class).getIfUnique(() -> this.observationConvention);
    }

    @Override
    public MessageId send(@Nullable T message) throws PulsarClientException {
        return this.doSend(null, message, null, null, null, null);
    }

    @Override
    public MessageId send(@Nullable T message, @Nullable Schema<T> schema) throws PulsarClientException {
        return this.doSend(null, message, schema, null, null, null);
    }

    @Override
    public MessageId send(@Nullable String topic, @Nullable T message) throws PulsarClientException {
        return this.doSend(topic, message, null, null, null, null);
    }

    @Override
    public MessageId send(@Nullable String topic, @Nullable T message, @Nullable Schema<T> schema) throws PulsarClientException {
        return this.doSend(topic, message, schema, null, null, null);
    }

    @Override
    public CompletableFuture<MessageId> sendAsync(@Nullable T message) throws PulsarClientException {
        return this.doSendAsync(null, message, null, null, null, null);
    }

    @Override
    public CompletableFuture<MessageId> sendAsync(@Nullable T message, @Nullable Schema<T> schema) throws PulsarClientException {
        return this.doSendAsync(null, message, schema, null, null, null);
    }

    @Override
    public CompletableFuture<MessageId> sendAsync(@Nullable String topic, @Nullable T message) throws PulsarClientException {
        return this.doSendAsync(topic, message, null, null, null, null);
    }

    @Override
    public CompletableFuture<MessageId> sendAsync(@Nullable String topic, @Nullable T message, @Nullable Schema<T> schema) throws PulsarClientException {
        return this.doSendAsync(topic, message, schema, null, null, null);
    }

    @Override
    public PulsarOperations.SendMessageBuilder<T> newMessage(@Nullable T message) {
        return new SendMessageBuilderImpl<T>(this, message);
    }

    public void setBeanName(String beanName) {
        this.beanName = beanName;
    }

    private MessageId doSend(@Nullable String topic, @Nullable T message, @Nullable Schema<T> schema, @Nullable Collection<String> encryptionKeys, @Nullable TypedMessageBuilderCustomizer<T> typedMessageBuilderCustomizer, @Nullable ProducerBuilderCustomizer<T> producerCustomizer) throws PulsarClientException {
        try {
            return this.doSendAsync(topic, message, schema, encryptionKeys, typedMessageBuilderCustomizer, producerCustomizer).get();
        }
        catch (Exception ex) {
            throw PulsarClientException.unwrap((Throwable)ex);
        }
    }

    private CompletableFuture<MessageId> doSendAsync(@Nullable String topic, @Nullable T message, @Nullable Schema<T> schema, @Nullable Collection<String> encryptionKeys, @Nullable TypedMessageBuilderCustomizer<T> typedMessageBuilderCustomizer, @Nullable ProducerBuilderCustomizer<T> producerCustomizer) throws PulsarClientException {
        String defaultTopic = Objects.toString(this.producerFactory.getDefaultTopic(), null);
        String topicName = this.topicResolver.resolveTopic(topic, message, () -> defaultTopic).orElseThrow();
        this.logger.trace(() -> "Sending msg to '%s' topic".formatted(topicName));
        PulsarMessageSenderContext senderContext = PulsarMessageSenderContext.newContext(topicName, this.beanName);
        Observation observation = this.newObservation(senderContext);
        try {
            TypedMessageBuilder messageBuilder;
            observation.start();
            Producer<T> producer = this.prepareProducerForSend(topicName, message, schema, encryptionKeys, producerCustomizer);
            try {
                messageBuilder = producer.newMessage().value(message);
                if (typedMessageBuilderCustomizer != null) {
                    typedMessageBuilderCustomizer.customize(messageBuilder);
                }
                senderContext.properties().forEach((arg_0, arg_1) -> ((TypedMessageBuilder)messageBuilder).property(arg_0, arg_1));
            }
            catch (Exception e) {
                ProducerUtils.closeProducerAsync(producer, this.logger);
                throw e;
            }
            return messageBuilder.sendAsync().whenComplete((msgId, ex) -> {
                if (ex == null) {
                    this.logger.trace(() -> "Sent msg to '%s' topic".formatted(topicName));
                    observation.stop();
                } else {
                    this.logger.error(ex, () -> "Failed to send msg to '%s' topic".formatted(topicName));
                    observation.error(ex);
                    observation.stop();
                }
                ProducerUtils.closeProducerAsync(producer, this.logger);
            });
        }
        catch (RuntimeException ex2) {
            observation.error((Throwable)ex2);
            observation.stop();
            throw ex2;
        }
    }

    private Observation newObservation(PulsarMessageSenderContext senderContext) {
        if (this.observationRegistry == null) {
            return Observation.NOOP;
        }
        return PulsarTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention, DefaultPulsarTemplateObservationConvention.INSTANCE, () -> senderContext, this.observationRegistry);
    }

    private Producer<T> prepareProducerForSend(@Nullable String topic, @Nullable T message, @Nullable Schema<T> schema, @Nullable Collection<String> encryptionKeys, @Nullable ProducerBuilderCustomizer<T> producerCustomizer) throws PulsarClientException {
        Schema<T> resolvedSchema = schema == null ? this.schemaResolver.resolveSchema(message).orElseThrow() : schema;
        ArrayList customizers = new ArrayList();
        if (!CollectionUtils.isEmpty(this.interceptors)) {
            customizers.add(builder -> this.interceptors.forEach(xva$0 -> builder.intercept(new ProducerInterceptor[]{xva$0})));
        }
        if (producerCustomizer != null) {
            customizers.add(producerCustomizer);
        }
        return this.producerFactory.createProducer(resolvedSchema, topic, encryptionKeys, customizers);
    }

    public static class SendMessageBuilderImpl<T>
    implements PulsarOperations.SendMessageBuilder<T> {
        private final PulsarTemplate<T> template;
        @Nullable
        private final T message;
        @Nullable
        private String topic;
        @Nullable
        private Schema<T> schema;
        @Nullable
        private Collection<String> encryptionKeys;
        @Nullable
        private TypedMessageBuilderCustomizer<T> messageCustomizer;
        @Nullable
        private ProducerBuilderCustomizer<T> producerCustomizer;

        SendMessageBuilderImpl(PulsarTemplate<T> template, @Nullable T message) {
            this.template = template;
            this.message = message;
        }

        @Override
        public PulsarOperations.SendMessageBuilder<T> withTopic(String topic) {
            this.topic = topic;
            return this;
        }

        @Override
        public PulsarOperations.SendMessageBuilder<T> withSchema(Schema<T> schema) {
            this.schema = schema;
            return this;
        }

        @Override
        public PulsarOperations.SendMessageBuilder<T> withEncryptionKeys(Collection<String> encryptionKeys) {
            this.encryptionKeys = encryptionKeys;
            return this;
        }

        @Override
        public PulsarOperations.SendMessageBuilder<T> withMessageCustomizer(TypedMessageBuilderCustomizer<T> messageCustomizer) {
            this.messageCustomizer = messageCustomizer;
            return this;
        }

        @Override
        public PulsarOperations.SendMessageBuilder<T> withProducerCustomizer(ProducerBuilderCustomizer<T> producerCustomizer) {
            this.producerCustomizer = producerCustomizer;
            return this;
        }

        @Override
        public MessageId send() throws PulsarClientException {
            return this.template.doSend(this.topic, this.message, this.schema, this.encryptionKeys, this.messageCustomizer, this.producerCustomizer);
        }

        @Override
        public CompletableFuture<MessageId> sendAsync() throws PulsarClientException {
            return this.template.doSendAsync(this.topic, this.message, this.schema, this.encryptionKeys, this.messageCustomizer, this.producerCustomizer);
        }
    }
}

