/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.messaging;

import jakarta.annotation.Nonnull;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.SimpleEntry;

public class QueueMessageStream<M extends Message<?>>
implements MessageStream<M> {
    private static final Runnable NO_OP_CALLBACK = () -> {};
    private final BlockingQueue<MessageStream.Entry<M>> queue;
    private final AtomicReference<Runnable> onAvailableCallbackRef = new AtomicReference<Runnable>(NO_OP_CALLBACK);
    private final AtomicBoolean completed = new AtomicBoolean(false);
    private final AtomicReference<Throwable> errorRef = new AtomicReference();
    private final AtomicReference<Runnable> onConsumeCallback = new AtomicReference<Runnable>(NO_OP_CALLBACK);
    private final AtomicBoolean closed = new AtomicBoolean(false);

    public QueueMessageStream() {
        this(new LinkedBlockingQueue<MessageStream.Entry<M>>());
    }

    public QueueMessageStream(@Nonnull BlockingQueue<MessageStream.Entry<M>> queue) {
        this.queue = queue;
    }

    public boolean offer(@Nonnull M message, @Nonnull Context context) {
        if (this.queue.offer(new SimpleEntry<M>(message, context))) {
            this.onAvailableCallbackRef.get().run();
            return true;
        }
        return false;
    }

    public void complete() {
        this.completed.set(true);
        this.onAvailableCallbackRef.get().run();
    }

    public void completeExceptionally(@Nonnull Throwable error) {
        this.errorRef.set(error);
        this.completed.set(true);
        this.onAvailableCallbackRef.get().run();
    }

    public void onConsumeCallback(@Nonnull Runnable callback) {
        this.onConsumeCallback.set(callback);
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    @Override
    public Optional<MessageStream.Entry<M>> next() {
        MessageStream.Entry poll = (MessageStream.Entry)this.queue.poll();
        if (poll != null) {
            this.onConsumeCallback.get().run();
        }
        return Optional.ofNullable(poll);
    }

    @Override
    public void onAvailable(@Nonnull Runnable callback) {
        this.onAvailableCallbackRef.set(callback);
        if (!this.queue.isEmpty() || this.isCompleted()) {
            callback.run();
        }
    }

    @Override
    public Optional<Throwable> error() {
        return Optional.ofNullable(this.errorRef.get());
    }

    @Override
    public boolean isCompleted() {
        return this.queue.isEmpty() && this.completed.get();
    }

    @Override
    public boolean hasNextAvailable() {
        return !this.queue.isEmpty();
    }

    @Override
    public void close() {
        this.closed.set(true);
        this.onConsumeCallback.get().run();
    }
}

