/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageBuilder;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.impl.RoutingStrategy;
import com.rabbitmq.stream.impl.StreamEnvironment;
import com.rabbitmq.stream.impl.StreamProducerBuilder;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SuperStreamProducer
implements Producer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SuperStreamProducer.class);
    private final RoutingStrategy routingStrategy;
    private final Codec codec;
    private final String superStream;
    private final Map<String, Producer> producers = new ConcurrentHashMap<String, Producer>();
    private final StreamProducerBuilder producerBuilder;
    private final StreamEnvironment environment;
    private final String name;

    SuperStreamProducer(StreamProducerBuilder producerBuilder, String name, String superStream, RoutingStrategy routingStrategy, StreamEnvironment streamEnvironment) {
        this.routingStrategy = routingStrategy;
        this.codec = streamEnvironment.codec();
        this.name = name;
        this.superStream = superStream;
        this.environment = streamEnvironment;
        this.producerBuilder = producerBuilder.duplicate();
        this.producerBuilder.stream(null);
        this.producerBuilder.resetRouting();
    }

    @Override
    public MessageBuilder messageBuilder() {
        return this.codec.messageBuilder();
    }

    @Override
    public long getLastPublishingId() {
        if (this.name != null && !this.name.isEmpty()) {
            List<String> streams = this.environment.locator().partitions(this.superStream);
            long publishingId = 0L;
            boolean first = true;
            for (String partition : streams) {
                long pubId = this.environment.locator().queryPublisherSequence(this.name, partition);
                if (first) {
                    publishingId = pubId;
                    first = false;
                    continue;
                }
                if (Long.compareUnsigned(publishingId, pubId) <= 0) continue;
                publishingId = pubId;
            }
            return publishingId;
        }
        throw new IllegalStateException("The producer has no name");
    }

    @Override
    public void send(Message message, ConfirmationHandler confirmationHandler) {
        List<String> streams = this.routingStrategy.route(message);
        for (String stream : streams) {
            Producer producer = this.producers.computeIfAbsent(stream, stream1 -> {
                Producer p = this.producerBuilder.duplicate().stream((String)stream1).build();
                return p;
            });
            producer.send(message, confirmationHandler);
        }
    }

    @Override
    public void close() {
        for (Map.Entry<String, Producer> entry : this.producers.entrySet()) {
            try {
                entry.getValue().close();
            }
            catch (Exception e) {
                LOGGER.info("Error while closing producer for partition {} of super stream {}: {}", new Object[]{entry.getKey(), this.superStream, e.getMessage()});
            }
        }
    }
}

