package io.eventuate.javaclient.domain;

import io.eventuate.DispatchedEvent;
import io.eventuate.Event;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/eventuate/javaclient/domain/SwimlaneDispatcher.class */
public class SwimlaneDispatcher {
    private static Logger logger = LoggerFactory.getLogger(SwimlaneDispatcher.class);
    private String subscriberId;
    private Integer swimlane;
    private Executor executor;
    private final LinkedBlockingQueue<QueuedEvent> queue = new LinkedBlockingQueue<>();
    private AtomicBoolean running = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/eventuate/javaclient/domain/SwimlaneDispatcher$QueuedEvent.class */
    public class QueuedEvent {
        DispatchedEvent<Event> event;
        private Function<DispatchedEvent<Event>, CompletableFuture<?>> target;
        CompletableFuture<Object> future = new CompletableFuture<>();

        public QueuedEvent(DispatchedEvent<Event> dispatchedEvent, Function<DispatchedEvent<Event>, CompletableFuture<?>> function) {
            this.event = dispatchedEvent;
            this.target = function;
        }
    }

    public SwimlaneDispatcher(String str, Integer num, Executor executor) {
        this.subscriberId = str;
        this.swimlane = num;
        this.executor = executor;
    }

    public CompletableFuture<?> dispatch(DispatchedEvent<Event> dispatchedEvent, Function<DispatchedEvent<Event>, CompletableFuture<?>> function) {
        CompletableFuture<?> completableFuture;
        synchronized (this.queue) {
            QueuedEvent queuedEvent = new QueuedEvent(dispatchedEvent, function);
            this.queue.add(queuedEvent);
            logger.trace("added event to queue: {} {} {}", new Object[]{this.subscriberId, this.swimlane, dispatchedEvent});
            if (this.running.compareAndSet(false, true)) {
                logger.trace("Stopped - attempting to process newly queued event: {} {}", this.subscriberId, this.swimlane);
                processNextQueuedEvent();
            } else {
                logger.trace("Running - Not attempting to process newly queued event: {} {}", this.subscriberId, this.swimlane);
            }
            completableFuture = queuedEvent.future;
        }
        return completableFuture;
    }

    private void processNextQueuedEvent() {
        this.executor.execute(this::processQueuedEvent);
    }

    public void processQueuedEvent() {
        QueuedEvent nextEvent = getNextEvent();
        if (nextEvent == null) {
            logger.trace("No queued event for {} {}", this.subscriberId, this.swimlane);
        } else {
            logger.trace("Invoking handler for event for {} {} {}", new Object[]{this.subscriberId, this.swimlane, nextEvent.event});
            ((CompletableFuture) nextEvent.target.apply(nextEvent.event)).handle((obj, th) -> {
                if (th != null) {
                    logger.error(String.format("handler for %s %s  %s failed: ", this.subscriberId, this.swimlane, nextEvent.event), th);
                    logger.trace("Completed future failed{}", Boolean.valueOf(nextEvent.future.completeExceptionally(th)));
                    return null;
                }
                logger.debug("Handler succeeded for event for {} {} {}", new Object[]{this.subscriberId, this.swimlane, nextEvent.event});
                logger.trace("Completed future success {}", Boolean.valueOf(nextEvent.future.complete(obj)));
                logger.trace("Maybe processing next queued event {} {}", this.subscriberId, this.swimlane);
                processNextQueuedEvent();
                return null;
            });
        }
    }

    private QueuedEvent getNextEvent() {
        QueuedEvent poll;
        QueuedEvent poll2 = this.queue.poll();
        if (poll2 != null) {
            return poll2;
        }
        synchronized (this.queue) {
            poll = this.queue.poll();
            if (poll == null) {
                this.running.compareAndSet(true, false);
            }
        }
        return poll;
    }
}
