/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.messaging;

import jakarta.annotation.Nonnull;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;

class OnErrorContinueMessageStream<M extends Message<?>>
implements MessageStream<M> {
    private final MessageStream<M> delegate;
    private final AtomicReference<MessageStream<M>> onErrorStream = new AtomicReference();
    private final Function<Throwable, MessageStream<M>> onError;
    private final AtomicReference<Runnable> callback = new AtomicReference<Runnable>(() -> {});

    OnErrorContinueMessageStream(@Nonnull MessageStream<M> delegate, @Nonnull Function<Throwable, MessageStream<M>> onError) {
        this.delegate = delegate;
        this.onError = onError;
    }

    @Override
    public Optional<MessageStream.Entry<M>> next() {
        return this.resolveCurrentDelegate().next();
    }

    @Override
    public void onAvailable(@Nonnull Runnable callback) {
        this.resolveCurrentDelegate().onAvailable(callback);
        this.callback.set(callback);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private MessageStream<M> resolveCurrentDelegate() {
        if (!this.delegate.isCompleted() || this.delegate.error().isEmpty()) {
            return this.delegate;
        }
        if (this.onErrorStream.get() != null) {
            return this.onErrorStream.get();
        }
        OnErrorContinueMessageStream onErrorContinueMessageStream = this;
        synchronized (onErrorContinueMessageStream) {
            MessageStream<M> newMessageStream = this.onErrorStream.updateAndGet(c -> {
                if (c == null) {
                    return this.onError.apply(this.delegate.error().orElse(null));
                }
                return c;
            });
            newMessageStream.onAvailable(this.callback.get());
            return newMessageStream;
        }
    }

    @Override
    public Optional<Throwable> error() {
        return this.resolveCurrentDelegate().error();
    }

    @Override
    public boolean isCompleted() {
        return this.resolveCurrentDelegate().isCompleted();
    }

    @Override
    public boolean hasNextAvailable() {
        return this.resolveCurrentDelegate().hasNextAvailable();
    }

    @Override
    public void close() {
        this.resolveCurrentDelegate().close();
    }
}

