package com.github.fridujo.rabbitmq.mock;

import com.github.fridujo.rabbitmq.mock.ReceiverPointer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/fridujo/rabbitmq/mock/MockQueue.class */
public class MockQueue implements Receiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(MockQueue.class);
    private final String name;
    private final ReceiverPointer pointer;
    private final Map<String, Object> arguments;
    private final ReceiverRegistry receiverRegistry;
    private final MockChannel mockChannel;
    private final Map<String, ConsumerAndTag> consumersByTag = new LinkedHashMap();
    private final AtomicInteger sequence = new AtomicInteger();
    private final Queue<Message> messages = new LinkedList();
    private final Map<Long, Message> unackedMessagesByDeliveryTag = new LinkedHashMap();
    private final ExecutorService executorService = Executors.newFixedThreadPool(1);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/fridujo/rabbitmq/mock/MockQueue$ConsumerAndTag.class */
    public static class ConsumerAndTag {
        private final String tag;
        private final Consumer consumer;
        private final boolean autoAck;
        private final Supplier<Long> deliveryTagSupplier;

        ConsumerAndTag(String str, Consumer consumer, boolean z, Supplier<Long> supplier) {
            this.tag = str;
            this.consumer = consumer;
            this.autoAck = z;
            this.deliveryTagSupplier = supplier;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/fridujo/rabbitmq/mock/MockQueue$Message.class */
    public static class Message {
        final String exchangeName;
        final String routingKey;
        final AMQP.BasicProperties props;
        final byte[] body;

        Message(String str, String str2, AMQP.BasicProperties basicProperties, byte[] bArr) {
            this.exchangeName = str;
            this.routingKey = str2;
            this.props = basicProperties;
            this.body = bArr;
        }
    }

    public MockQueue(String str, Map<String, Object> map, ReceiverRegistry receiverRegistry, MockChannel mockChannel) {
        this.name = str;
        this.pointer = new ReceiverPointer(ReceiverPointer.Type.QUEUE, str);
        this.arguments = map;
        this.receiverRegistry = receiverRegistry;
        this.mockChannel = mockChannel;
        start();
    }

    private void start() {
        this.executorService.submit(() -> {
            while (true) {
                if (!deliverToConsumerIfPossible()) {
                    TimeUnit.MILLISECONDS.sleep(30L);
                }
            }
        });
    }

    private boolean deliverToConsumerIfPossible() {
        Message poll;
        boolean z = false;
        if (this.consumersByTag.size() > 0 && (poll = this.messages.poll()) != null) {
            ConsumerAndTag consumerAndTag = (ConsumerAndTag) new ArrayList(this.consumersByTag.values()).get(this.sequence.incrementAndGet() % this.consumersByTag.size());
            long longValue = ((Long) consumerAndTag.deliveryTagSupplier.get()).longValue();
            this.unackedMessagesByDeliveryTag.put(Long.valueOf(longValue), poll);
            try {
                consumerAndTag.consumer.handleDelivery(consumerAndTag.tag, new Envelope(longValue, false, poll.exchangeName, poll.routingKey), poll.props, poll.body);
                this.mockChannel.getMetricsCollector().consumedMessage(this.mockChannel, longValue, consumerAndTag.tag);
                if (consumerAndTag.autoAck) {
                    this.unackedMessagesByDeliveryTag.remove(Long.valueOf(longValue));
                }
                z = true;
            } catch (IOException e) {
                LOGGER.warn("Unable to deliver message to consumer [" + consumerAndTag.tag + "]");
                basicReject(longValue, true);
            }
        }
        return z;
    }

    @Override // com.github.fridujo.rabbitmq.mock.Receiver
    public void publish(String str, String str2, AMQP.BasicProperties basicProperties, byte[] bArr) {
        this.messages.offer(new Message(str, str2, basicProperties, bArr));
    }

    @Override // com.github.fridujo.rabbitmq.mock.Receiver
    public ReceiverPointer pointer() {
        return this.pointer;
    }

    public void basicConsume(String str, Consumer consumer, boolean z, Supplier<Long> supplier) {
        this.consumersByTag.put(str, new ConsumerAndTag(str, consumer, z, supplier));
        consumer.handleConsumeOk(str);
    }

    public GetResponse basicGet(boolean z, Supplier<Long> supplier) {
        long longValue = supplier.get().longValue();
        Message poll = this.messages.poll();
        if (poll == null) {
            return null;
        }
        if (!z) {
            this.unackedMessagesByDeliveryTag.put(Long.valueOf(longValue), poll);
        }
        return new GetResponse(new Envelope(longValue, false, poll.exchangeName, poll.routingKey), poll.props, poll.body, this.messages.size());
    }

    public void basicAck(long j, boolean z) {
        if (!z) {
            this.unackedMessagesByDeliveryTag.remove(Long.valueOf(j));
            return;
        }
        Map<Long, Message> map = this.unackedMessagesByDeliveryTag;
        map.getClass();
        doWithUnackedUntil(j, (v1) -> {
            r2.remove(v1);
        });
    }

    public void basicNack(long j, boolean z, boolean z2) {
        if (z) {
            doWithUnackedUntil(j, l -> {
                basicReject(l.longValue(), z2);
            });
        } else {
            basicReject(j, z2);
        }
    }

    public void basicReject(long j, boolean z) {
        Message remove = this.unackedMessagesByDeliveryTag.remove(Long.valueOf(j));
        if (remove != null) {
            if (z) {
                this.messages.offer(remove);
            } else {
                getDeadLetterExchange().ifPresent(receiver -> {
                    receiver.publish(remove.exchangeName, remove.routingKey, remove.props, remove.body);
                });
            }
        }
    }

    private Optional<Receiver> getDeadLetterExchange() {
        Optional filter = Optional.ofNullable(this.arguments.get(Receiver.DEAD_LETTER_EXCHANGE_KEY)).filter(obj -> {
            return obj instanceof String;
        });
        Class<String> cls = String.class;
        String.class.getClass();
        Optional map = filter.map(cls::cast).map(str -> {
            return new ReceiverPointer(ReceiverPointer.Type.EXCHANGE, str);
        });
        ReceiverRegistry receiverRegistry = this.receiverRegistry;
        receiverRegistry.getClass();
        return map.flatMap(receiverRegistry::getReceiver);
    }

    public void basicCancel(String str) {
        if (this.consumersByTag.containsKey(str)) {
            this.consumersByTag.remove(str).consumer.handleCancelOk(str);
        }
    }

    public void notifyDeleted() {
        for (ConsumerAndTag consumerAndTag : this.consumersByTag.values()) {
            try {
                consumerAndTag.consumer.handleCancel(consumerAndTag.tag);
            } catch (IOException e) {
                LOGGER.warn("Consumer threw an exception when notified about cancellation", e);
            }
        }
    }

    public void basicRecover(boolean z) {
        new LinkedHashSet(this.unackedMessagesByDeliveryTag.keySet()).forEach(l -> {
            this.messages.offer(this.unackedMessagesByDeliveryTag.remove(l));
        });
        this.consumersByTag.values().forEach(consumerAndTag -> {
            consumerAndTag.consumer.handleRecoverOk(consumerAndTag.tag);
        });
    }

    public int messageCount() {
        return this.messages.size();
    }

    public int consumerCount() {
        return this.consumersByTag.size();
    }

    public int purge() {
        int messageCount = messageCount();
        this.messages.clear();
        return messageCount;
    }

    private void doWithUnackedUntil(long j, java.util.function.Consumer<Long> consumer) {
        if (this.unackedMessagesByDeliveryTag.containsKey(Long.valueOf(j))) {
            LinkedHashSet linkedHashSet = new LinkedHashSet();
            for (Long l : this.unackedMessagesByDeliveryTag.keySet()) {
                linkedHashSet.add(l);
                if (Long.valueOf(j).equals(l)) {
                    break;
                }
            }
            linkedHashSet.forEach(consumer);
        }
    }
}
