package edu.utexas.tacc.tapis.shared.notifications;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Delivery;
import edu.utexas.tacc.tapis.shared.utils.TapisObjectMapper;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.UUID;
import org.jvnet.hk2.annotations.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;

@Service
/* loaded from: input_file:edu/utexas/tacc/tapis/shared/notifications/TapisNotificationsClient.class */
public class TapisNotificationsClient implements ITapisNotificationsClient {
    private final Receiver receiver;
    private final Sender sender;
    private static final String EXCHANGE_NAME = "tapis.notifications";
    private static final String USER_EXCHANGE_NAME = "tapis.userNotifications";
    private static final Logger log = LoggerFactory.getLogger(TapisNotificationsClient.class);
    private static final long EXPIRATION = Duration.ofDays(7).toMillis();
    private static final ObjectMapper mapper = TapisObjectMapper.getMapper();

    public TapisNotificationsClient() {
        ConnectionFactory rabbitMQConnection = RabbitMQConnection.getInstance();
        ReceiverOptions connectionSubscriptionScheduler = new ReceiverOptions().connectionFactory(rabbitMQConnection).connectionSubscriptionScheduler(Schedulers.newElastic("receiver"));
        SenderOptions resourceManagementScheduler = new SenderOptions().connectionFactory(rabbitMQConnection).resourceManagementScheduler(Schedulers.newElastic("sender"));
        this.receiver = RabbitFlux.createReceiver(connectionSubscriptionScheduler);
        this.sender = RabbitFlux.createSender(resourceManagementScheduler);
        ExchangeSpecification exchangeSpecification = new ExchangeSpecification();
        exchangeSpecification.durable(true);
        exchangeSpecification.type("topic");
        exchangeSpecification.name("tapis.notifications");
        this.sender.declareExchange(exchangeSpecification).subscribe();
    }

    @Override // edu.utexas.tacc.tapis.shared.notifications.ITapisNotificationsClient
    public Mono<Void> sendNotificationAsync(String str, Notification notification) {
        try {
            return this.sender.send(Mono.just(new OutboundMessage("tapis.notifications", str, new AMQP.BasicProperties.Builder().expiration(String.valueOf(EXPIRATION)).build(), mapper.writeValueAsString(notification).getBytes())));
        } catch (IOException e) {
            log.error("Could not serialize message, ignoring: {}", notification.toString());
            return Mono.empty();
        }
    }

    @Override // edu.utexas.tacc.tapis.shared.notifications.ITapisNotificationsClient
    public void sendNotification(String str, Notification notification) throws IOException {
        sendNotificationAsync(str, notification).subscribe();
    }

    @Override // edu.utexas.tacc.tapis.shared.notifications.ITapisNotificationsClient
    public Flux<Notification> streamNotifications(String str) {
        QueueSpecification queueSpecification = new QueueSpecification();
        HashMap hashMap = new HashMap();
        hashMap.put("x-expires", Long.valueOf(Duration.ofDays(7L).toMillis()));
        queueSpecification.durable(true);
        queueSpecification.arguments(hashMap);
        queueSpecification.name("tapis.notifications." + UUID.randomUUID().toString());
        BindingSpecification bindingSpecification = new BindingSpecification();
        bindingSpecification.exchange("tapis.notifications");
        bindingSpecification.queue(queueSpecification.getName());
        bindingSpecification.routingKey(str);
        return this.receiver.consumeAutoAck(queueSpecification.getName()).delaySubscription(this.sender.declareQueue(queueSpecification).then(this.sender.bindQueue(bindingSpecification))).flatMap(this::deserializeNotification);
    }

    private Mono<Notification> deserializeNotification(Delivery delivery) {
        try {
            return Mono.just((Notification) mapper.readValue(delivery.getBody(), Notification.class));
        } catch (IOException e) {
            log.error("ERROR: Could new deserialize message", e);
            return Mono.empty();
        }
    }
}
