package org.infinispan.util.concurrent;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Delayed;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import org.infinispan.commons.executors.BlockingResource;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.executors.LimitedExecutor;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

@Scope(Scopes.GLOBAL)
/* loaded from: input_file:org/infinispan/util/concurrent/BlockingManagerImpl.class */
public class BlockingManagerImpl implements BlockingManager {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private final AtomicInteger id;

    @ComponentName(KnownComponentNames.NON_BLOCKING_EXECUTOR)
    @Inject
    Executor nonBlockingExecutor;

    @ComponentName(KnownComponentNames.BLOCKING_EXECUTOR)
    @Inject
    Executor blockingExecutor;

    @Inject
    NonBlockingManager nonBlockingManager;

    @ComponentName(KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR)
    @Inject
    ScheduledExecutorService scheduledExecutorService;
    private Scheduler blockingScheduler;
    private Scheduler nonBlockingScheduler;

    /* loaded from: input_file:org/infinispan/util/concurrent/BlockingManagerImpl$LimitedBlockingExecutor.class */
    private class LimitedBlockingExecutor implements BlockingManager.BlockingExecutor {
        private final LimitedExecutor limitedExecutor;

        private LimitedBlockingExecutor(LimitedExecutor limitedExecutor) {
            this.limitedExecutor = limitedExecutor;
        }

        @Override // org.infinispan.util.concurrent.BlockingManager.BlockingExecutor
        public CompletionStage<Void> execute(Runnable runnable, Object obj) {
            return BlockingManagerImpl.this.runBlockingOperation(runnable, obj, this.limitedExecutor);
        }

        @Override // org.infinispan.util.concurrent.BlockingManager.BlockingExecutor
        public <V> CompletionStage<V> supply(Supplier<V> supplier, Object obj) {
            return BlockingManagerImpl.this.supplyBlockingOperation(supplier, obj, this.limitedExecutor);
        }
    }

    /* loaded from: input_file:org/infinispan/util/concurrent/BlockingManagerImpl$ReentrantBlockingExecutor.class */
    private class ReentrantBlockingExecutor implements Executor {
        private ReentrantBlockingExecutor() {
        }

        @Override // java.util.concurrent.Executor
        public void execute(Runnable runnable) {
            BlockingManagerImpl.this.runBlockingOperation(runnable, BlockingManagerImpl.this.nextTraceId(), BlockingManagerImpl.this.blockingExecutor, false);
        }
    }

    /* loaded from: input_file:org/infinispan/util/concurrent/BlockingManagerImpl$ScheduledBlockingFuture.class */
    private class ScheduledBlockingFuture<V> extends CompletableFuture<V> implements BlockingManager.ScheduledBlockingCompletableStage<V>, Runnable {
        private volatile ScheduledFuture<?> scheduledFuture;
        private final Supplier<V> supplier;
        private final Object traceId;

        private ScheduledBlockingFuture(Supplier<V> supplier, Object obj) {
            this.supplier = (Supplier) Objects.requireNonNull(supplier);
            this.traceId = Objects.requireNonNull(obj);
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(TimeUnit timeUnit) {
            return this.scheduledFuture.getDelay(timeUnit);
        }

        @Override // java.lang.Comparable
        public int compareTo(Delayed delayed) {
            return delayed instanceof ScheduledBlockingFuture ? this.scheduledFuture.compareTo(((ScheduledBlockingFuture) delayed).scheduledFuture) : this.scheduledFuture.compareTo(delayed);
        }

        @Override // java.util.concurrent.CompletableFuture, java.util.concurrent.Future
        public boolean cancel(boolean z) {
            this.scheduledFuture.cancel(true);
            return super.cancel(z);
        }

        @Override // java.lang.Runnable
        public void run() {
            CompletableFuture.supplyAsync(() -> {
                BlockingManagerImpl.log.tracef("Running blocking supply operation %s", this.traceId);
                return this.supplier.get();
            }, BlockingManagerImpl.this.blockingExecutor).whenComplete((obj, th) -> {
                if (th != null) {
                    BlockingManagerImpl.log.tracef("Operation %s completed exceptionally with message %s", this.traceId, th.getMessage());
                    BlockingManagerImpl.this.nonBlockingManager.completeExceptionally(this, th);
                } else {
                    BlockingManagerImpl.log.tracef("Operation %s completed normally", this.traceId);
                    BlockingManagerImpl.this.nonBlockingManager.complete(this, obj);
                }
            });
        }
    }

    public BlockingManagerImpl() {
        this.id = log.isTraceEnabled() ? new AtomicInteger() : null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Start
    public void start() {
        this.blockingScheduler = Schedulers.from(new ReentrantBlockingExecutor());
        this.nonBlockingScheduler = Schedulers.from(this.nonBlockingExecutor);
    }

    private String nextTraceId() {
        if (this.id != null) {
            return "-BlockingManagerImpl-" + this.id.getAndIncrement();
        }
        return null;
    }

    @Override // org.infinispan.util.concurrent.BlockingManager
    public CompletionStage<Void> runBlocking(Runnable runnable, Object obj) {
        return runBlockingOperation(runnable, obj, this.blockingExecutor);
    }

    @Override // org.infinispan.util.concurrent.BlockingManager
    public <E> CompletionStage<Void> subscribeBlockingConsumer(Publisher<E> publisher, Consumer<E> consumer, Object obj) {
        Flowable observeOn = Flowable.fromPublisher(publisher).observeOn(this.blockingScheduler);
        if (log.isTraceEnabled()) {
            observeOn = observeOn.doOnNext(obj2 -> {
                log.tracef("Invoking blocking consumer for %s with value %s", obj, obj2);
            });
        }
        Objects.requireNonNull(consumer);
        return continueOnNonBlockingThread(observeOn.doOnNext(consumer::accept).ignoreElements().toCompletionStage((Object) null), obj);
    }

    @Override // org.infinispan.util.concurrent.BlockingManager
    public <T, A, R> CompletionStage<R> subscribeBlockingCollector(Publisher<T> publisher, Collector<? super T, A, R> collector, Object obj) {
        Flowable observeOn = Flowable.fromPublisher(publisher).observeOn(this.blockingScheduler);
        if (log.isTraceEnabled()) {
            observeOn = observeOn.doOnNext(obj2 -> {
                log.tracef("Invoking blocking collector for %s with value %s", obj, obj2);
            });
        }
        return continueOnNonBlockingThread(Flowable.fromPublisher(observeOn).collect(collector).toCompletionStage(), obj);
    }

    private CompletionStage<Void> runBlockingOperation(Runnable runnable, Object obj, Executor executor) {
        return runBlockingOperation(runnable, obj, executor, true);
    }

    private CompletionStage<Void> runBlockingOperation(Runnable runnable, Object obj, Executor executor, boolean z) {
        CompletableFuture<Void> runAsync;
        if (!isCurrentThreadBlocking()) {
            if (log.isTraceEnabled()) {
                log.tracef("Submitting blocking run operation %s to blocking thread", obj);
                runAsync = CompletableFuture.runAsync(() -> {
                    log.tracef("Running blocking run operation %s", obj);
                    runnable.run();
                }, executor);
            } else {
                runAsync = CompletableFuture.runAsync(runnable, executor);
            }
            return z ? continueOnNonBlockingThread(runAsync, obj) : runAsync;
        }
        if (log.isTraceEnabled()) {
            log.tracef("Invoked run on a blocking thread, running %s in same blocking thread", obj);
        }
        try {
            runnable.run();
            return CompletableFutures.completedNull();
        } catch (Throwable th) {
            return CompletableFuture.failedFuture(th);
        }
    }

    @Override // org.infinispan.util.concurrent.BlockingManager
    public <V> CompletionStage<V> supplyBlocking(Supplier<V> supplier, Object obj) {
        return supplyBlockingOperation(supplier, obj, this.blockingExecutor);
    }

    private <V> CompletionStage<V> supplyBlockingOperation(Supplier<V> supplier, Object obj, Executor executor) {
        CompletableFuture supplyAsync;
        if (isCurrentThreadBlocking()) {
            if (log.isTraceEnabled()) {
                log.tracef("Invoked supply on a blocking thread, running %s in same blocking thread", obj);
            }
            try {
                return CompletableFuture.completedFuture(supplier.get());
            } catch (Throwable th) {
                return CompletableFuture.failedFuture(th);
            }
        }
        if (log.isTraceEnabled()) {
            log.tracef("Submitting blocking supply operation %s to blocking thread", obj);
            supplyAsync = CompletableFuture.supplyAsync(() -> {
                log.tracef("Running blocking supply operation %s", obj);
                return supplier.get();
            }, executor);
        } else {
            supplyAsync = CompletableFuture.supplyAsync(supplier, executor);
        }
        return continueOnNonBlockingThread(supplyAsync, obj);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.infinispan.util.concurrent.BlockingManager
    public <I, O> CompletionStage<O> handleBlocking(CompletionStage<? extends I> completionStage, BiFunction<? super I, Throwable, ? extends O> biFunction, Object obj) {
        if (!CompletionStages.isCompletedSuccessfully(completionStage) || !isCurrentThreadBlocking()) {
            return continueOnNonBlockingThread(completionStage.handleAsync(biFunction, this.blockingExecutor), obj);
        }
        Object obj2 = null;
        Throwable th = null;
        try {
            if (log.isTraceEnabled()) {
                log.tracef("Invoked handle on a blocking thread, joining %s in same blocking thread", obj);
            }
            obj2 = CompletionStages.join(completionStage);
        } catch (Throwable th2) {
            th = th2;
        }
        return CompletableFuture.completedFuture(biFunction.apply(obj2, th));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.infinispan.util.concurrent.BlockingManager
    public <I, O> CompletionStage<O> thenApplyBlocking(CompletionStage<? extends I> completionStage, Function<? super I, ? extends O> function, Object obj) {
        if (!CompletionStages.isCompletedSuccessfully(completionStage) || !isCurrentThreadBlocking()) {
            return continueOnNonBlockingThread(completionStage.thenApplyAsync(function, this.blockingExecutor), obj);
        }
        if (log.isTraceEnabled()) {
            log.tracef("Invoked thenApply on a blocking thread, joining %s in same blocking thread", obj);
        }
        try {
            return CompletableFuture.completedFuture(function.apply(CompletionStages.join(completionStage)));
        } catch (Throwable th) {
            return CompletableFuture.failedFuture(th);
        }
    }

    @Override // org.infinispan.util.concurrent.BlockingManager
    public <I> CompletionStage<Void> thenRunBlocking(CompletionStage<? extends I> completionStage, Runnable runnable, Object obj) {
        if (!CompletionStages.isCompletedSuccessfully(completionStage) || !isCurrentThreadBlocking()) {
            return continueOnNonBlockingThread(completionStage.thenRunAsync(runnable, this.blockingExecutor), obj);
        }
        if (log.isTraceEnabled()) {
            log.tracef("Invoked thenRun on a blocking thread, joining %s in same blocking thread", obj);
        }
        try {
            CompletionStages.join(completionStage);
            runnable.run();
            return CompletableFutures.completedNull();
        } catch (Throwable th) {
            return CompletableFuture.failedFuture(th);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.infinispan.util.concurrent.BlockingManager
    public <I, O> CompletionStage<O> thenComposeBlocking(CompletionStage<? extends I> completionStage, Function<? super I, ? extends CompletionStage<O>> function, Object obj) {
        if (!CompletionStages.isCompletedSuccessfully(completionStage) || !isCurrentThreadBlocking()) {
            return continueOnNonBlockingThread(completionStage.thenComposeAsync(function, this.blockingExecutor), obj);
        }
        if (log.isTraceEnabled()) {
            log.tracef("Invoked thenComposeBlocking on a blocking thread, joining %s in same blocking thread", obj);
        }
        try {
            return (CompletionStage) function.apply(CompletionStages.join(completionStage));
        } catch (Throwable th) {
            return CompletableFuture.failedFuture(th);
        }
    }

    @Override // org.infinispan.util.concurrent.BlockingManager
    public <V> CompletionStage<V> whenCompleteBlocking(CompletionStage<V> completionStage, BiConsumer<? super V, ? super Throwable> biConsumer, Object obj) {
        if (!CompletionStages.isCompletedSuccessfully(completionStage) || !isCurrentThreadBlocking()) {
            return continueOnNonBlockingThread(completionStage.whenCompleteAsync(biConsumer, this.blockingExecutor), obj);
        }
        if (log.isTraceEnabled()) {
            log.tracef("Invoked whenComplete on a blocking thread, joining %s in same blocking thread", obj);
        }
        Object obj2 = null;
        Throwable th = null;
        try {
            obj2 = CompletionStages.join(completionStage);
        } catch (Throwable th2) {
            th = th2;
        }
        try {
            biConsumer.accept(obj2, th);
            return completionStage.whenComplete(biConsumer);
        } catch (Throwable th3) {
            if (th == null) {
                return CompletableFuture.failedFuture(th3);
            }
            th.addSuppressed(th3);
            return CompletableFuture.failedFuture(th);
        }
    }

    @Override // org.infinispan.util.concurrent.BlockingManager
    public <V> CompletionStage<V> continueOnNonBlockingThread(CompletionStage<V> completionStage, Object obj) {
        if (!CompletionStages.isCompletedSuccessfully(completionStage)) {
            return completionStage.whenCompleteAsync((obj2, th) -> {
                if (th != null) {
                    if (log.isTraceEnabled()) {
                        log.tracef("Continuing execution of id %s with exception %s", obj, th.getMessage());
                    }
                } else if (log.isTraceEnabled()) {
                    log.tracef("Continuing execution of id %s", obj);
                }
            }, this.nonBlockingExecutor);
        }
        if (log.isTraceEnabled()) {
            log.tracef("Stage for %s was already completed, returning in same thread", obj);
        }
        return completionStage;
    }

    @Override // org.infinispan.util.concurrent.BlockingManager
    public Executor nonBlockingExecutor() {
        return this.nonBlockingExecutor;
    }

    @Override // org.infinispan.util.concurrent.BlockingManager
    public <V> Publisher<V> blockingPublisher(Publisher<V> publisher) {
        return Flowable.defer(() -> {
            if (isCurrentThreadBlocking()) {
                return publisher;
            }
            if (!log.isTraceEnabled()) {
                return Flowable.fromPublisher(publisher).subscribeOn(this.blockingScheduler).observeOn(this.nonBlockingScheduler);
            }
            int identityHashCode = System.identityHashCode(publisher);
            log.tracef("Blocking publisher start %d", identityHashCode);
            return Flowable.fromPublisher(publisher).subscribeOn(this.blockingScheduler).observeOn(this.nonBlockingScheduler).doFinally(() -> {
                log.tracef("Blocking publisher done %d", identityHashCode);
            });
        });
    }

    @Override // org.infinispan.util.concurrent.BlockingManager
    public <V> CompletionStage<Void> blockingPublisherToVoidStage(Publisher<V> publisher, Object obj) {
        Flowable fromPublisher = Flowable.fromPublisher(publisher);
        if (!isCurrentThreadBlocking()) {
            if (log.isTraceEnabled()) {
                fromPublisher = fromPublisher.doOnSubscribe(subscription -> {
                    log.tracef("Subscribing to %s on blocking thread", obj);
                });
            }
            fromPublisher = fromPublisher.subscribeOn(this.blockingScheduler);
            if (log.isTraceEnabled()) {
                fromPublisher = fromPublisher.doOnSubscribe(subscription2 -> {
                    log.tracef("Publisher %s subscribing thread is %s", obj, Thread.currentThread());
                });
            }
        } else if (log.isTraceEnabled()) {
            log.tracef("Invoked on a blocking thread, subscribing %s in same blocking thread", obj);
        }
        return continueOnNonBlockingThread(fromPublisher.ignoreElements().toCompletionStage((Object) null), obj);
    }

    @Override // org.infinispan.util.concurrent.BlockingManager
    public Executor asExecutor(String str) {
        return !log.isTraceEnabled() ? this.blockingExecutor : runnable -> {
            log.tracef("Submitting blocking run operation %s with name %s to blocking thread", runnable, str);
            this.blockingExecutor.execute(() -> {
                log.tracef("Running blocking operation %s with name %s on blocking thread", runnable, str);
                runnable.run();
            });
        };
    }

    @Override // org.infinispan.util.concurrent.BlockingManager
    public BlockingManager.BlockingExecutor limitedBlockingExecutor(String str, int i) {
        return new LimitedBlockingExecutor(new LimitedExecutor(str, this.blockingExecutor, i));
    }

    @Override // org.infinispan.util.concurrent.BlockingManager
    public <V> BlockingManager.ScheduledBlockingCompletableStage<V> scheduleRunBlocking(Supplier<V> supplier, long j, TimeUnit timeUnit, Object obj) {
        ScheduledBlockingFuture scheduledBlockingFuture = new ScheduledBlockingFuture(supplier, obj);
        log.tracef("Scheduling supply operation %s for %s to run in %s %s", new Object[]{supplier, obj, Long.valueOf(j), timeUnit});
        scheduledBlockingFuture.scheduledFuture = this.scheduledExecutorService.schedule(scheduledBlockingFuture, j, timeUnit);
        return scheduledBlockingFuture;
    }

    @Override // org.infinispan.util.concurrent.BlockingManager
    public ScheduledFuture<Void> scheduleRunBlockingAtFixedRate(Runnable runnable, long j, long j2, TimeUnit timeUnit, Object obj) {
        Supplier supplier = () -> {
            runnable.run();
            return null;
        };
        ScheduledBlockingFuture scheduledBlockingFuture = new ScheduledBlockingFuture(supplier, obj);
        log.tracef("Scheduling supply operation %s for %s to run in %s %s", new Object[]{supplier, obj, Long.valueOf(j), Long.valueOf(j2), timeUnit});
        scheduledBlockingFuture.scheduledFuture = this.scheduledExecutorService.scheduleAtFixedRate(runnable, j, j2, timeUnit);
        return scheduledBlockingFuture;
    }

    protected boolean isCurrentThreadBlocking() {
        return Thread.currentThread().getThreadGroup() instanceof BlockingResource;
    }
}
