package org.axonframework.eventhandling.pooled;

import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import org.axonframework.common.io.IOUtils;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackerStatus;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.WrappedToken;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.messaging.StreamableMessageSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/axonframework/eventhandling/pooled/Coordinator.class */
public class Coordinator {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final String name;
    private final StreamableMessageSource<TrackedEventMessage<?>> messageSource;
    private final TokenStore tokenStore;
    private final TransactionManager transactionManager;
    private final ScheduledExecutorService executorService;
    private final BiFunction<Segment, TrackingToken, WorkPackage> workPackageFactory;
    private final EventFilter eventFilter;
    private final Consumer<? super TrackedEventMessage<?>> ignoredMessageHandler;
    private final BiConsumer<Integer, UnaryOperator<TrackerStatus>> processingStatusUpdater;
    private final long tokenClaimInterval;
    private final Clock clock;
    private final int maxClaimedSegments;
    private final Map<Integer, WorkPackage> workPackages;
    private final AtomicReference<RunState> runState;
    private final Map<Integer, Instant> releasesDeadlines;
    private int errorWaitBackOff;
    private final Queue<CoordinatorTask> coordinatorTasks;
    private final AtomicReference<CoordinationTask> coordinationTask;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/axonframework/eventhandling/pooled/Coordinator$Builder.class */
    public static class Builder {
        private String name;
        private StreamableMessageSource<TrackedEventMessage<?>> messageSource;
        private TokenStore tokenStore;
        private TransactionManager transactionManager;
        private ScheduledExecutorService executorService;
        private BiFunction<Segment, TrackingToken, WorkPackage> workPackageFactory;
        private EventFilter eventFilter;
        private BiConsumer<Integer, UnaryOperator<TrackerStatus>> processingStatusUpdater;
        private int maxClaimedSegments;
        private Consumer<? super TrackedEventMessage<?>> ignoredMessageHandler = trackedEventMessage -> {
        };
        private long tokenClaimInterval = 5000;
        private Clock clock = GenericEventMessage.clock;

        Builder() {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder name(String str) {
            this.name = str;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder messageSource(StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource) {
            this.messageSource = streamableMessageSource;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder tokenStore(TokenStore tokenStore) {
            this.tokenStore = tokenStore;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder transactionManager(TransactionManager transactionManager) {
            this.transactionManager = transactionManager;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder executorService(ScheduledExecutorService scheduledExecutorService) {
            this.executorService = scheduledExecutorService;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder workPackageFactory(BiFunction<Segment, TrackingToken, WorkPackage> biFunction) {
            this.workPackageFactory = biFunction;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder eventFilter(EventFilter eventFilter) {
            this.eventFilter = eventFilter;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder onMessageIgnored(Consumer<? super TrackedEventMessage<?>> consumer) {
            this.ignoredMessageHandler = consumer;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder processingStatusUpdater(BiConsumer<Integer, UnaryOperator<TrackerStatus>> biConsumer) {
            this.processingStatusUpdater = biConsumer;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder tokenClaimInterval(long j) {
            this.tokenClaimInterval = j;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder clock(Clock clock) {
            this.clock = clock;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder maxClaimedSegments(int i) {
            this.maxClaimedSegments = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Coordinator build() {
            return new Coordinator(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/pooled/Coordinator$CoordinationTask.class */
    public class CoordinationTask implements Runnable {
        private final AtomicBoolean processingGate;
        private final AtomicBoolean scheduledGate;
        private final AtomicBoolean interruptibleScheduledGate;
        private BlockingStream<TrackedEventMessage<?>> eventStream;
        private TrackingToken lastScheduledToken;
        private boolean availabilityCallbackSupported;
        private long unclaimedSegmentValidationThreshold;

        private CoordinationTask() {
            this.processingGate = new AtomicBoolean();
            this.scheduledGate = new AtomicBoolean();
            this.interruptibleScheduledGate = new AtomicBoolean();
            this.lastScheduledToken = NoToken.INSTANCE;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.processingGate.compareAndSet(false, true)) {
                if (!((RunState) Coordinator.this.runState.get()).isRunning()) {
                    Coordinator.logger.debug("Stopped processing. Runnable flag is false.\nReleasing claims and closing the event stream for Processor [{}].", Coordinator.this.name);
                    abortWorkPackages(null).thenRun(() -> {
                        ((RunState) Coordinator.this.runState.get()).shutdownHandle().complete(null);
                    });
                    IOUtils.closeQuietly(this.eventStream);
                    return;
                }
                Coordinator.this.workPackages.entrySet().stream().filter(entry -> {
                    return isSegmentBlockedFromClaim(((Integer) entry.getKey()).intValue());
                }).map((v0) -> {
                    return v0.getValue();
                }).forEach(workPackage -> {
                    abortWorkPackage(workPackage, null);
                });
                if (!Coordinator.this.coordinatorTasks.isEmpty()) {
                    CoordinatorTask coordinatorTask = (CoordinatorTask) Coordinator.this.coordinatorTasks.remove();
                    Coordinator.logger.debug("Processor [{}] found task [{}] to run.", Coordinator.this.name, coordinatorTask.getDescription());
                    coordinatorTask.run().thenRun(() -> {
                        this.unclaimedSegmentValidationThreshold = 0L;
                    }).whenComplete((r4, th) -> {
                        this.processingGate.set(false);
                        scheduleImmediateCoordinationTask();
                    });
                    return;
                }
                if (this.eventStream == null || this.unclaimedSegmentValidationThreshold <= Coordinator.this.clock.instant().toEpochMilli()) {
                    this.unclaimedSegmentValidationThreshold = Coordinator.this.clock.instant().toEpochMilli() + Coordinator.this.tokenClaimInterval;
                    try {
                        Map<Segment, TrackingToken> claimNewSegments = claimNewSegments();
                        TrackingToken trackingToken = this.lastScheduledToken;
                        for (Map.Entry<Segment, TrackingToken> entry2 : claimNewSegments.entrySet()) {
                            Segment key = entry2.getKey();
                            TrackingToken value = entry2.getValue();
                            trackingToken = trackingToken == null ? null : trackingToken.lowerBound(WrappedToken.unwrapLowerBound(value));
                            Coordinator.logger.debug("Processor [{}] claimed {} for processing.", Coordinator.this.name, key);
                            Coordinator.this.workPackages.computeIfAbsent(Integer.valueOf(key.getSegmentId()), num -> {
                                return (WorkPackage) Coordinator.this.workPackageFactory.apply(key, value);
                            });
                        }
                        if (Coordinator.logger.isInfoEnabled() && !claimNewSegments.isEmpty()) {
                            Coordinator.logger.info("Processor [{}] claimed {} new segments for processing", Coordinator.this.name, Integer.valueOf(claimNewSegments.size()));
                        }
                        ensureOpenStream(trackingToken);
                    } catch (Exception e) {
                        Coordinator.logger.warn("Exception occurred while Processor [{}] started work packages and opened the event stream.", Coordinator.this.name, e);
                        abortAndScheduleRetry(e);
                        return;
                    }
                }
                if (Coordinator.this.workPackages.isEmpty()) {
                    Coordinator.logger.debug("No segments claimed. Will retry in {} milliseconds.", Long.valueOf(Coordinator.this.tokenClaimInterval));
                    this.lastScheduledToken = NoToken.INSTANCE;
                    IOUtils.closeQuietly(this.eventStream);
                    this.eventStream = null;
                    this.processingGate.set(false);
                    scheduleDelayedCoordinationTask(Coordinator.this.tokenClaimInterval);
                    return;
                }
                try {
                    coordinateWorkPackages();
                    Coordinator.this.errorWaitBackOff = 500;
                    this.processingGate.set(false);
                    if (isSpaceAvailable() && this.eventStream.hasNextAvailable()) {
                        scheduleImmediateCoordinationTask();
                    } else if (isSpaceAvailable()) {
                        Coordinator.this.workPackages.keySet().forEach(num2 -> {
                            Coordinator.this.processingStatusUpdater.accept(num2, (v0) -> {
                                return v0.caughtUp();
                            });
                        });
                        if (this.availabilityCallbackSupported) {
                            scheduleDelayedCoordinationTask(Coordinator.this.tokenClaimInterval);
                        } else {
                            scheduleCoordinationTask(500L);
                        }
                    } else {
                        scheduleCoordinationTask(100L);
                    }
                } catch (Exception e2) {
                    Coordinator.logger.warn("Exception occurred while Processor [{}] was coordinating the work packages.", Coordinator.this.name, e2);
                    if (!(e2 instanceof InterruptedException)) {
                        abortAndScheduleRetry(e2);
                        return;
                    }
                    Coordinator.logger.error(String.format("Processor [%s] was interrupted. Shutting down.", Coordinator.this.name), e2);
                    Coordinator.this.stop();
                    Thread.currentThread().interrupt();
                }
            }
        }

        private CompletableFuture<Void> abortWorkPackages(Exception exc) {
            CompletableFuture completableFuture = (CompletableFuture) Coordinator.this.workPackages.values().stream().map(workPackage -> {
                return abortWorkPackage(workPackage, exc);
            }).reduce((completableFuture2, completableFuture3) -> {
                return CompletableFuture.allOf(completableFuture2, completableFuture3);
            }).orElse(CompletableFuture.completedFuture(null));
            Map map = Coordinator.this.workPackages;
            map.getClass();
            return completableFuture.thenRun(map::clear);
        }

        private Map<Segment, TrackingToken> claimNewSegments() {
            HashMap hashMap = new HashMap();
            int[] iArr = (int[]) Coordinator.this.transactionManager.fetchInTransaction(() -> {
                return Coordinator.this.tokenStore.fetchSegments(Coordinator.this.name);
            });
            int[] array = Arrays.stream(iArr).filter(i -> {
                return !Coordinator.this.workPackages.containsKey(Integer.valueOf(i));
            }).toArray();
            int size = Coordinator.this.maxClaimedSegments - Coordinator.this.workPackages.size();
            for (int i2 : array) {
                if (isSegmentBlockedFromClaim(i2)) {
                    Coordinator.logger.debug("Segment {} is still marked to not be claimed by Processor [{}].", Integer.valueOf(i2), Coordinator.this.name);
                    Coordinator.this.processingStatusUpdater.accept(Integer.valueOf(i2), trackerStatus -> {
                        return null;
                    });
                } else if (hashMap.size() < size) {
                    try {
                        hashMap.put(Segment.computeSegment(i2, iArr), (TrackingToken) Coordinator.this.transactionManager.fetchInTransaction(() -> {
                            return Coordinator.this.tokenStore.fetchToken(Coordinator.this.name, i2);
                        }));
                    } catch (UnableToClaimTokenException e) {
                        Coordinator.this.processingStatusUpdater.accept(Integer.valueOf(i2), trackerStatus2 -> {
                            return null;
                        });
                        Coordinator.logger.debug("Unable to claim the token for segment {}. It is owned by another process.", Integer.valueOf(i2));
                    }
                }
            }
            return hashMap;
        }

        private boolean isSegmentBlockedFromClaim(int i) {
            return Coordinator.this.releasesDeadlines.compute(Integer.valueOf(i), (num, instant) -> {
                if (instant == null || Coordinator.this.clock.instant().isAfter(instant)) {
                    return null;
                }
                return instant;
            }) != null;
        }

        private void ensureOpenStream(TrackingToken trackingToken) {
            if (this.eventStream != null && !Objects.equals(trackingToken, this.lastScheduledToken)) {
                Coordinator.logger.debug("Processor [{}] will close the current stream.", Coordinator.this.name);
                IOUtils.closeQuietly(this.eventStream);
                this.eventStream = null;
                this.lastScheduledToken = NoToken.INSTANCE;
            }
            if (this.eventStream != null || Coordinator.this.workPackages.isEmpty()) {
                return;
            }
            this.eventStream = Coordinator.this.messageSource.openStream(trackingToken);
            Coordinator.logger.debug("Processor [{}] opened stream with tracking token [{}].", Coordinator.this.name, trackingToken);
            this.availabilityCallbackSupported = this.eventStream.setOnAvailableCallback(this::scheduleImmediateCoordinationTask);
            this.lastScheduledToken = trackingToken;
        }

        private boolean isSpaceAvailable() {
            return Coordinator.this.workPackages.values().stream().allMatch((v0) -> {
                return v0.hasRemainingCapacity();
            });
        }

        private void coordinateWorkPackages() throws InterruptedException {
            Coordinator.logger.debug("Processor [{}] is coordinating work to all its work packages.", Coordinator.this.name);
            for (int i = 0; i < 1024 && isSpaceAvailable() && this.eventStream.hasNextAvailable(); i++) {
                TrackedEventMessage<?> nextAvailable = this.eventStream.nextAvailable();
                if (Coordinator.this.eventFilter.canHandleTypeOf(nextAvailable)) {
                    boolean z = false;
                    Iterator it = Coordinator.this.workPackages.values().iterator();
                    while (it.hasNext()) {
                        z = z || ((WorkPackage) it.next()).scheduleEvent(nextAvailable);
                    }
                    if (!z) {
                        Coordinator.this.ignoredMessageHandler.accept(nextAvailable);
                    }
                } else {
                    this.eventStream.skipMessagesWithPayloadTypeOf(nextAvailable);
                    Coordinator.this.ignoredMessageHandler.accept(nextAvailable);
                }
                this.lastScheduledToken = nextAvailable.trackingToken();
            }
            Coordinator.this.workPackages.values().stream().filter((v0) -> {
                return v0.isAbortTriggered();
            }).forEach(workPackage -> {
                abortWorkPackage(workPackage, null);
            });
            Coordinator.this.workPackages.values().forEach((v0) -> {
                v0.scheduleWorker();
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void scheduleImmediateCoordinationTask() {
            scheduleCoordinationTask(0L);
        }

        private void scheduleCoordinationTask(long j) {
            if (this.scheduledGate.compareAndSet(false, true)) {
                Coordinator.this.executorService.schedule(() -> {
                    this.scheduledGate.set(false);
                    run();
                }, j, TimeUnit.MILLISECONDS);
            }
        }

        private void scheduleDelayedCoordinationTask(long j) {
            if (this.scheduledGate.get() || !this.interruptibleScheduledGate.compareAndSet(false, true)) {
                return;
            }
            Coordinator.this.executorService.schedule(() -> {
                this.interruptibleScheduledGate.set(false);
                run();
            }, j, TimeUnit.MILLISECONDS);
        }

        private void abortAndScheduleRetry(Exception exc) {
            Coordinator.logger.info("Releasing claims and scheduling a new coordination task in {}ms", Integer.valueOf(Coordinator.this.errorWaitBackOff));
            Coordinator.this.errorWaitBackOff = Math.min(Coordinator.this.errorWaitBackOff * 2, 60000);
            abortWorkPackages(exc).thenRun(() -> {
                Coordinator.logger.debug("Work packages have aborted. Scheduling new coordination task to run in {}ms", Integer.valueOf(Coordinator.this.errorWaitBackOff));
                CoordinationTask coordinationTask = new CoordinationTask();
                Coordinator.this.executorService.schedule(coordinationTask, Coordinator.this.errorWaitBackOff, TimeUnit.MILLISECONDS);
                Coordinator.this.coordinationTask.set(coordinationTask);
            });
            IOUtils.closeQuietly(this.eventStream);
        }

        private CompletableFuture<Void> abortWorkPackage(WorkPackage workPackage, Exception exc) {
            return workPackage.abort(exc).thenRun(() -> {
                if (Coordinator.this.workPackages.remove(Integer.valueOf(workPackage.segment().getSegmentId()), workPackage)) {
                    Coordinator.logger.debug("Processor [{}] released claim on {}.", Coordinator.this.name, workPackage.segment());
                }
            }).thenRun(() -> {
                Coordinator.this.transactionManager.executeInTransaction(() -> {
                    Coordinator.this.tokenStore.releaseClaim(Coordinator.this.name, workPackage.segment().getSegmentId());
                });
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/axonframework/eventhandling/pooled/Coordinator$EventFilter.class */
    public interface EventFilter {
        boolean canHandleTypeOf(TrackedEventMessage<?> trackedEventMessage);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/pooled/Coordinator$NoToken.class */
    public static class NoToken implements TrackingToken {
        public static final TrackingToken INSTANCE = new NoToken();

        private NoToken() {
        }

        @Override // org.axonframework.eventhandling.TrackingToken
        public TrackingToken lowerBound(TrackingToken trackingToken) {
            return trackingToken;
        }

        @Override // org.axonframework.eventhandling.TrackingToken
        public TrackingToken upperBound(TrackingToken trackingToken) {
            return trackingToken;
        }

        @Override // org.axonframework.eventhandling.TrackingToken
        public boolean covers(TrackingToken trackingToken) {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/pooled/Coordinator$RunState.class */
    public static class RunState {
        private final boolean isRunning;
        private final boolean wasStarted;
        private final CompletableFuture<Void> shutdownHandle;

        private RunState(boolean z, boolean z2, CompletableFuture<Void> completableFuture) {
            this.isRunning = z;
            this.wasStarted = z2;
            this.shutdownHandle = completableFuture;
        }

        public static RunState initial() {
            return new RunState(false, false, CompletableFuture.completedFuture(null));
        }

        public RunState attemptStart() {
            return this.isRunning ? new RunState(true, false, null) : this.shutdownHandle.isDone() ? new RunState(true, true, null) : this;
        }

        public RunState attemptStop() {
            return (this.isRunning && this.shutdownHandle == null) ? new RunState(false, false, new CompletableFuture()) : this;
        }

        public boolean isRunning() {
            return this.isRunning;
        }

        public boolean wasStarted() {
            return this.wasStarted;
        }

        public CompletableFuture<Void> shutdownHandle() {
            return this.shutdownHandle;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Builder builder() {
        return new Builder();
    }

    private Coordinator(Builder builder) {
        this.workPackages = new ConcurrentHashMap();
        this.runState = new AtomicReference<>(RunState.initial());
        this.releasesDeadlines = new ConcurrentHashMap();
        this.errorWaitBackOff = 500;
        this.coordinatorTasks = new ConcurrentLinkedQueue();
        this.coordinationTask = new AtomicReference<>();
        this.name = builder.name;
        this.messageSource = builder.messageSource;
        this.tokenStore = builder.tokenStore;
        this.transactionManager = builder.transactionManager;
        this.executorService = builder.executorService;
        this.workPackageFactory = builder.workPackageFactory;
        this.eventFilter = builder.eventFilter;
        this.ignoredMessageHandler = builder.ignoredMessageHandler;
        this.processingStatusUpdater = builder.processingStatusUpdater;
        this.tokenClaimInterval = builder.tokenClaimInterval;
        this.clock = builder.clock;
        this.maxClaimedSegments = builder.maxClaimedSegments;
    }

    public void start() {
        RunState updateAndGet = this.runState.updateAndGet((v0) -> {
            return v0.attemptStart();
        });
        if (!updateAndGet.wasStarted()) {
            if (!updateAndGet.isRunning) {
                throw new IllegalStateException("Cannot start a processor while it's in process of shutting down.");
            }
            return;
        }
        logger.debug("Starting Coordinator for Processor [{}].", this.name);
        try {
            CoordinationTask coordinationTask = new CoordinationTask();
            this.executorService.submit(coordinationTask);
            this.coordinationTask.set(coordinationTask);
        } catch (Exception e) {
            this.runState.updateAndGet((v0) -> {
                return v0.attemptStop();
            }).shutdownHandle().complete(null);
            throw e;
        }
    }

    public CompletableFuture<Void> stop() {
        logger.debug("Stopping Coordinator for Processor [{}].", this.name);
        CompletableFuture<Void> shutdownHandle = this.runState.updateAndGet((v0) -> {
            return v0.attemptStop();
        }).shutdownHandle();
        CoordinationTask andSet = this.coordinationTask.getAndSet(null);
        if (andSet != null) {
            andSet.scheduleImmediateCoordinationTask();
        }
        return shutdownHandle;
    }

    public boolean isRunning() {
        return this.runState.get().isRunning();
    }

    private void scheduleCoordinator() {
        CoordinationTask coordinationTask = this.coordinationTask.get();
        if (coordinationTask != null) {
            coordinationTask.scheduleImmediateCoordinationTask();
        }
    }

    public boolean isError() {
        return this.errorWaitBackOff > 500;
    }

    public void releaseUntil(int i, Instant instant) {
        logger.debug("Processor [{}] will release segment {} for processing until {}.", new Object[]{this.name, Integer.valueOf(i), instant});
        this.releasesDeadlines.put(Integer.valueOf(i), instant);
        scheduleCoordinator();
    }

    public CompletableFuture<Boolean> splitSegment(int i) {
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        this.coordinatorTasks.add(new SplitTask(completableFuture, this.name, i, this.workPackages, this.tokenStore, this.transactionManager));
        scheduleCoordinator();
        return completableFuture;
    }

    public CompletableFuture<Boolean> mergeSegment(int i) {
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        this.coordinatorTasks.add(new MergeTask(completableFuture, this.name, i, this.workPackages, this.tokenStore, this.transactionManager));
        scheduleCoordinator();
        return completableFuture;
    }
}
