/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.pulsar.reactive.core;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.MessageSendResult;
import org.apache.pulsar.reactive.client.api.MessageSpec;
import org.apache.pulsar.reactive.client.api.MessageSpecBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.reactivestreams.Publisher;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.reactive.core.MessageSpecBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactiveMessageSenderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarOperations;
import org.springframework.pulsar.reactive.core.ReactivePulsarSenderFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReactivePulsarTemplate<T>
implements ReactivePulsarOperations<T> {
    private final LogAccessor logger = new LogAccessor(this.getClass());
    private final ReactivePulsarSenderFactory<T> reactiveMessageSenderFactory;
    private final SchemaResolver schemaResolver;
    private final TopicResolver topicResolver;

    public ReactivePulsarTemplate(ReactivePulsarSenderFactory<T> reactiveMessageSenderFactory) {
        this(reactiveMessageSenderFactory, (SchemaResolver)new DefaultSchemaResolver(), (TopicResolver)new DefaultTopicResolver());
    }

    public ReactivePulsarTemplate(ReactivePulsarSenderFactory<T> reactiveMessageSenderFactory, SchemaResolver schemaResolver, TopicResolver topicResolver) {
        this.reactiveMessageSenderFactory = reactiveMessageSenderFactory;
        this.schemaResolver = schemaResolver;
        this.topicResolver = topicResolver;
    }

    @Override
    public Mono<MessageId> send(@Nullable T message) {
        return this.send(null, message);
    }

    @Override
    public Mono<MessageId> send(@Nullable T message, @Nullable Schema<T> schema) {
        return this.doSend(null, message, schema, null, null);
    }

    @Override
    public Mono<MessageId> send(@Nullable String topic, @Nullable T message) {
        return this.doSend(topic, message, null, null, null);
    }

    @Override
    public Mono<MessageId> send(@Nullable String topic, @Nullable T message, @Nullable Schema<T> schema) {
        return this.doSend(topic, message, schema, null, null);
    }

    @Override
    public Flux<MessageSendResult<T>> send(Publisher<MessageSpec<T>> messages) {
        return this.send((String)null, messages);
    }

    @Override
    public Flux<MessageSendResult<T>> send(Publisher<MessageSpec<T>> messages, @Nullable Schema<T> schema) {
        return this.doSendMany(null, Flux.from(messages), schema, null);
    }

    @Override
    public Flux<MessageSendResult<T>> send(@Nullable String topic, Publisher<MessageSpec<T>> messages) {
        return this.doSendMany(topic, Flux.from(messages), null, null);
    }

    @Override
    public Flux<MessageSendResult<T>> send(@Nullable String topic, Publisher<MessageSpec<T>> messages, @Nullable Schema<T> schema) {
        return this.doSendMany(topic, Flux.from(messages), schema, null);
    }

    @Override
    public ReactivePulsarOperations.SendOneMessageBuilder<T> newMessage(@Nullable T message) {
        return new SendOneMessageBuilderImpl<T>(this, message);
    }

    @Override
    public ReactivePulsarOperations.SendManyMessageBuilder<T> newMessages(Publisher<MessageSpec<T>> messages) {
        return new SendManyMessageBuilderImpl<T>(this, messages);
    }

    private Mono<MessageId> doSend(@Nullable String topic, @Nullable T message, @Nullable Schema<T> schema, @Nullable MessageSpecBuilderCustomizer<T> messageSpecBuilderCustomizer, @Nullable ReactiveMessageSenderBuilderCustomizer<T> customizer) {
        String topicName = this.resolveTopic(topic, message);
        this.logger.trace(() -> "Sending reactive msg to '%s' topic".formatted(topicName));
        ReactiveMessageSender<T> sender = this.createMessageSender(topicName, message, schema, customizer);
        return sender.sendOne(ReactivePulsarTemplate.getMessageSpec(messageSpecBuilderCustomizer, message)).doOnError(ex -> this.logger.error(ex, () -> "Failed to send message to '%s' topic".formatted(topicName))).doOnSuccess(msgId -> this.logger.trace(() -> "Sent message to '%s' topic".formatted(topicName)));
    }

    private Flux<MessageSendResult<T>> doSendMany(@Nullable String topic, Flux<MessageSpec<T>> messages, @Nullable Schema<T> schema, @Nullable ReactiveMessageSenderBuilderCustomizer<T> customizer) {
        return messages.switchOnFirst((firstSignal, messageFlux) -> {
            MessageSpec firstMessage = (MessageSpec)firstSignal.get();
            if (firstMessage != null && firstSignal.isOnNext()) {
                String topicName = this.resolveTopic(topic, firstMessage.getValue());
                ReactiveMessageSender<Object> sender = this.createMessageSender(topicName, firstMessage.getValue(), schema, customizer);
                return ((Flux)messageFlux.as(arg_0 -> sender.sendMany(arg_0))).doOnError(ex -> this.logger.error(ex, () -> "Failed to send messages to '%s' topic".formatted(topicName))).doOnNext(msgId -> this.logger.trace(() -> "Sent messages to '%s' topic".formatted(topicName)));
            }
            return messageFlux.thenMany((Publisher)Flux.empty());
        });
    }

    private String resolveTopic(@Nullable String topic, @Nullable Object message) {
        String defaultTopic = this.reactiveMessageSenderFactory.getDefaultTopic();
        return (String)this.topicResolver.resolveTopic(topic, message, () -> defaultTopic).orElseThrow();
    }

    private static <T> MessageSpec<T> getMessageSpec(@Nullable MessageSpecBuilderCustomizer<T> messageSpecBuilderCustomizer, @Nullable T message) {
        MessageSpecBuilder messageSpecBuilder = MessageSpec.builder(message);
        if (messageSpecBuilderCustomizer != null) {
            messageSpecBuilderCustomizer.customize(messageSpecBuilder);
        }
        return messageSpecBuilder.build();
    }

    private ReactiveMessageSender<T> createMessageSender(@Nullable String topic, @Nullable T message, @Nullable Schema<T> schema, @Nullable ReactiveMessageSenderBuilderCustomizer<T> customizer) {
        Schema resolvedSchema = schema == null ? (Schema)this.schemaResolver.resolveSchema(message).orElseThrow() : schema;
        return this.reactiveMessageSenderFactory.createSender(resolvedSchema, topic, customizer);
    }

    private static final class SendOneMessageBuilderImpl<T>
    extends SendMessageBuilderImpl<SendOneMessageBuilderImpl<T>, T>
    implements ReactivePulsarOperations.SendOneMessageBuilder<T> {
        @Nullable
        private final T message;
        @Nullable
        private MessageSpecBuilderCustomizer<T> messageCustomizer;

        SendOneMessageBuilderImpl(ReactivePulsarTemplate<T> template, @Nullable T message) {
            super(template);
            this.message = message;
        }

        @Override
        public SendOneMessageBuilderImpl<T> withMessageCustomizer(MessageSpecBuilderCustomizer<T> messageCustomizer) {
            this.messageCustomizer = messageCustomizer;
            return this;
        }

        @Override
        public Mono<MessageId> send() {
            return this.template.doSend(this.topic, this.message, this.schema, this.messageCustomizer, this.senderCustomizer);
        }
    }

    private static final class SendManyMessageBuilderImpl<T>
    extends SendMessageBuilderImpl<SendManyMessageBuilderImpl<T>, T>
    implements ReactivePulsarOperations.SendManyMessageBuilder<T> {
        private final Publisher<MessageSpec<T>> messages;

        SendManyMessageBuilderImpl(ReactivePulsarTemplate<T> template, Publisher<MessageSpec<T>> messages) {
            super(template);
            this.messages = messages;
        }

        @Override
        public Flux<MessageSendResult<T>> send() {
            return this.template.doSendMany(this.topic, Flux.from(this.messages), this.schema, this.senderCustomizer);
        }
    }

    private static class SendMessageBuilderImpl<O, T> {
        protected final ReactivePulsarTemplate<T> template;
        @Nullable
        protected String topic;
        @Nullable
        protected Schema<T> schema;
        @Nullable
        protected ReactiveMessageSenderBuilderCustomizer<T> senderCustomizer;

        SendMessageBuilderImpl(ReactivePulsarTemplate<T> template) {
            this.template = template;
        }

        public O withTopic(String topic) {
            this.topic = topic;
            return (O)this;
        }

        public O withSchema(Schema<T> schema) {
            this.schema = schema;
            return (O)this;
        }

        public O withSenderCustomizer(ReactiveMessageSenderBuilderCustomizer<T> senderCustomizer) {
            this.senderCustomizer = senderCustomizer;
            return (O)this;
        }
    }
}

