package io.moquette.persistence;

import io.moquette.broker.AbstractSessionMessageQueue;
import io.moquette.broker.SessionRegistry;
import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.unsafequeues.Queue;
import io.moquette.broker.unsafequeues.QueueException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Optional;

/* loaded from: input_file:io/moquette/persistence/SegmentPersistentQueue.class */
public class SegmentPersistentQueue extends AbstractSessionMessageQueue<SessionRegistry.EnqueuedMessage> {
    private final Queue segmentedQueue;
    private final SerDes serdes = new SerDes();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/moquette/persistence/SegmentPersistentQueue$SerDes.class */
    public static class SerDes {

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:io/moquette/persistence/SegmentPersistentQueue$SerDes$MessageType.class */
        public enum MessageType {
            PUB_REL_MARKER,
            PUBLISHED_MESSAGE
        }

        private SerDes() {
        }

        public ByteBuffer toBytes(SessionRegistry.EnqueuedMessage enqueuedMessage) {
            ByteBuffer allocate = ByteBuffer.allocate(getMemory(enqueuedMessage));
            allocate.mark();
            write(enqueuedMessage, allocate);
            allocate.reset();
            return allocate;
        }

        private void write(SessionRegistry.EnqueuedMessage enqueuedMessage, ByteBuffer byteBuffer) {
            if (!(enqueuedMessage instanceof SessionRegistry.PublishedMessage)) {
                if (!(enqueuedMessage instanceof SessionRegistry.PubRelMarker)) {
                    throw new IllegalArgumentException("Unrecognized message class " + enqueuedMessage.getClass());
                }
                byteBuffer.put((byte) MessageType.PUB_REL_MARKER.ordinal());
            } else {
                byteBuffer.put((byte) MessageType.PUBLISHED_MESSAGE.ordinal());
                SessionRegistry.PublishedMessage publishedMessage = (SessionRegistry.PublishedMessage) enqueuedMessage;
                byteBuffer.put((byte) publishedMessage.getPublishingQos().value());
                writeTopic(byteBuffer, publishedMessage.getTopic().toString());
                writePayload(byteBuffer, publishedMessage.getPayload());
            }
        }

        private void writePayload(ByteBuffer byteBuffer, ByteBuf byteBuf) {
            int readableBytes = byteBuf.readableBytes();
            byte[] bArr = new byte[readableBytes];
            int readerIndex = byteBuf.readerIndex();
            byteBuf.readBytes(bArr).release();
            byteBuf.readerIndex(readerIndex);
            byteBuffer.putInt(readableBytes);
            byteBuffer.put(bArr);
        }

        private void writeTopic(ByteBuffer byteBuffer, String str) {
            byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
            byteBuffer.putInt(bytes.length).put(bytes);
        }

        private int getMemory(SessionRegistry.EnqueuedMessage enqueuedMessage) {
            if (enqueuedMessage instanceof SessionRegistry.PubRelMarker) {
                return 1;
            }
            SessionRegistry.PublishedMessage publishedMessage = (SessionRegistry.PublishedMessage) enqueuedMessage;
            return 2 + topicMemorySize(publishedMessage.getTopic()) + payloadMemorySize(publishedMessage.getPayload());
        }

        private int payloadMemorySize(ByteBuf byteBuf) {
            return 4 + byteBuf.readableBytes();
        }

        private int topicMemorySize(Topic topic) {
            return 4 + topic.toString().getBytes(StandardCharsets.UTF_8).length;
        }

        public SessionRegistry.EnqueuedMessage fromBytes(ByteBuffer byteBuffer) {
            byte b = byteBuffer.get();
            if (b == MessageType.PUB_REL_MARKER.ordinal()) {
                return new SessionRegistry.PubRelMarker();
            }
            if (b != MessageType.PUBLISHED_MESSAGE.ordinal()) {
                throw new IllegalArgumentException("Can't recognize record of type: " + ((int) b));
            }
            return new SessionRegistry.PublishedMessage(Topic.asTopic(readTopic(byteBuffer)), MqttQoS.valueOf(byteBuffer.get()), readPayload(byteBuffer), false);
        }

        private String readTopic(ByteBuffer byteBuffer) {
            byte[] bArr = new byte[byteBuffer.getInt()];
            byteBuffer.get(bArr);
            return new String(bArr, StandardCharsets.UTF_8);
        }

        private ByteBuf readPayload(ByteBuffer byteBuffer) {
            byte[] bArr = new byte[byteBuffer.getInt()];
            byteBuffer.get(bArr);
            return Unpooled.wrappedBuffer(bArr);
        }
    }

    public SegmentPersistentQueue(Queue queue) {
        this.segmentedQueue = queue;
    }

    @Override // io.moquette.broker.SessionMessageQueue
    public void enqueue(SessionRegistry.EnqueuedMessage enqueuedMessage) {
        checkEnqueuePreconditions(enqueuedMessage);
        try {
            this.segmentedQueue.enqueue(this.serdes.toBytes(enqueuedMessage));
        } catch (QueueException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // io.moquette.broker.SessionMessageQueue
    public SessionRegistry.EnqueuedMessage dequeue() {
        checkDequeuePreconditions();
        try {
            Optional<ByteBuffer> dequeue = this.segmentedQueue.dequeue();
            if (!dequeue.isPresent()) {
                return null;
            }
            return this.serdes.fromBytes(dequeue.get());
        } catch (QueueException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // io.moquette.broker.SessionMessageQueue
    public boolean isEmpty() {
        return this.segmentedQueue.isEmpty();
    }

    @Override // io.moquette.broker.SessionMessageQueue
    public void closeAndPurge() {
        this.closed = true;
    }
}
