/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventsourcing.eventstore;

import jakarta.annotation.Nonnull;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.axonframework.common.Registration;
import org.axonframework.common.annotation.Internal;
import org.axonframework.messaging.core.MessageStream;
import org.axonframework.messaging.eventhandling.EventMessage;

@Internal
public final class ContinuousMessageStream<E>
implements MessageStream<EventMessage> {
    private final Function<E, List<E>> fetcher;
    private final BiFunction<ContinuousMessageStream<?>, Runnable, Registration> callbackTracker;
    private final Function<E, MessageStream.Entry<EventMessage>> converter;
    private E lastItem;
    private List<E> data = List.of();
    private MessageStream.Entry<EventMessage> nextEntry;
    private Throwable error;
    private int position;
    private Registration callbackRegistration;
    private Runnable callback;
    private boolean closed;

    public ContinuousMessageStream(@Nonnull Function<E, List<E>> fetcher, @Nonnull Function<E, MessageStream.Entry<EventMessage>> converter, @Nonnull BiFunction<ContinuousMessageStream<?>, Runnable, Registration> callbackTracker) {
        this.fetcher = Objects.requireNonNull(fetcher, "fetcher");
        this.converter = Objects.requireNonNull(converter, "converter");
        this.callbackTracker = Objects.requireNonNull(callbackTracker, "callbackTracker");
    }

    public synchronized void setCallback(Runnable callback) {
        if (!this.closed) {
            this.callback = callback;
            if (callback == null) {
                this.callbackRegistration.cancel();
                this.callbackRegistration = null;
            } else if (this.callbackRegistration == null) {
                this.callbackRegistration = this.callbackTracker.apply(this, this::invokeCallback);
            }
            this.invokeCallback();
        }
    }

    public synchronized Optional<MessageStream.Entry<EventMessage>> next() {
        try {
            Optional<MessageStream.Entry<EventMessage>> optional = this.peek();
            return optional;
        }
        finally {
            this.nextEntry = null;
        }
    }

    public synchronized Optional<MessageStream.Entry<EventMessage>> peek() {
        if (this.closed) {
            return Optional.empty();
        }
        if (this.nextEntry == null) {
            if (this.position >= this.data.size()) {
                this.fetchMore();
                if (this.closed || this.data.isEmpty()) {
                    return Optional.empty();
                }
            }
            E element = this.data.get(this.position++);
            this.nextEntry = this.converter.apply(element);
        }
        return Optional.of(this.nextEntry);
    }

    public synchronized Optional<Throwable> error() {
        return Optional.ofNullable(this.error);
    }

    public synchronized boolean isCompleted() {
        return this.error != null;
    }

    public synchronized boolean hasNextAvailable() {
        return this.peek().isPresent();
    }

    public synchronized void close() {
        if (!this.closed) {
            this.closed = true;
            this.data = null;
            if (this.callbackRegistration != null) {
                this.invokeCallback();
                this.callback = null;
                this.callbackRegistration.cancel();
            }
        }
    }

    private void invokeCallback() {
        try {
            if (this.callback != null) {
                this.callback.run();
            }
        }
        catch (Exception e) {
            this.error = e;
            this.close();
        }
    }

    private void fetchMore() {
        try {
            this.data = this.fetcher.apply(this.lastItem);
            this.position = 0;
            if (!this.data.isEmpty()) {
                this.lastItem = this.data.getLast();
            }
        }
        catch (Exception e) {
            this.error = e;
            this.close();
        }
    }
}

