/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.smallrye.reactivemessaging.runtime;

import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry;
import io.smallrye.reactive.messaging.providers.helpers.Validation;
import io.vertx.core.Context;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.ContextInternal;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.WorkerExecutor;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.BeforeDestroyed;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.event.Reception;
import jakarta.enterprise.inject.Alternative;
import jakarta.inject.Inject;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;

@Alternative
@Priority(value=1)
@ApplicationScoped
public class QuarkusWorkerPoolRegistry
extends WorkerPoolRegistry {
    private static final Logger log = Logger.getLogger(WorkerPoolRegistry.class);
    private static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker";
    private static final String WORKER_CONCURRENCY = "max-concurrency";
    public static final String DEFAULT_VIRTUAL_THREAD_WORKER = "<virtual-thread>";
    @Inject
    ExecutionHolder executionHolder;
    private final Map<String, Integer> workerConcurrency = new HashMap<String, Integer>();
    private final Map<String, WorkerExecutor> workerExecutors = new ConcurrentHashMap<String, WorkerExecutor>();
    private final Set<String> virtualThreadWorkers = QuarkusWorkerPoolRegistry.initVirtualThreadWorkers();

    private static Set<String> initVirtualThreadWorkers() {
        ConcurrentHashSet set = new ConcurrentHashSet();
        set.add(DEFAULT_VIRTUAL_THREAD_WORKER);
        return set;
    }

    public void terminate(@Observes(notifyObserver=Reception.IF_EXISTS) @Priority(value=100) @BeforeDestroyed(value=ApplicationScoped.class) Object event) {
        if (!this.workerExecutors.isEmpty()) {
            for (WorkerExecutor executor : this.workerExecutors.values()) {
                if (Infrastructure.canCallerThreadBeBlocked()) {
                    executor.closeAndAwait();
                    continue;
                }
                executor.closeAndForget();
            }
        }
    }

    public <T> Uni<T> executeWork(io.vertx.mutiny.core.Context msgContext, Uni<T> uni, String workerName, boolean ordered) {
        Objects.requireNonNull(uni, "Action to execute not provided");
        if (workerName == null) {
            if (msgContext != null) {
                return msgContext.executeBlocking(uni, ordered);
            }
            return this.executionHolder.vertx().executeBlocking(uni, ordered);
        }
        if (this.virtualThreadWorkers.contains(workerName)) {
            return this.runOnVirtualThread(msgContext, uni);
        }
        return this.runOnWorkerThread(msgContext, uni, workerName, ordered);
    }

    private <T> Uni<T> runOnWorkerThread(io.vertx.mutiny.core.Context msgContext, Uni<T> uni, String workerName, boolean ordered) {
        WorkerExecutor worker = this.getWorker(workerName);
        if (msgContext != null) {
            return worker.executeBlocking(QuarkusWorkerPoolRegistry.uniOnMessageContext(uni, msgContext), ordered).onItemOrFailure().transformToUni((item, failure) -> Uni.createFrom().emitter(emitter -> {
                if (failure != null) {
                    msgContext.runOnContext(() -> emitter.fail(failure));
                } else {
                    msgContext.runOnContext(() -> emitter.complete(item));
                }
            }));
        }
        return worker.executeBlocking(uni, ordered);
    }

    private static <T> Uni<T> uniOnMessageContext(Uni<T> uni, io.vertx.mutiny.core.Context msgContext) {
        return msgContext != Vertx.currentContext() ? uni.runSubscriptionOn(r -> new ContextPreservingRunnable(r, msgContext).run()) : uni;
    }

    private <T> Uni<T> runOnVirtualThread(io.vertx.mutiny.core.Context msgContext, Uni<T> uni) {
        ExecutorService vtExecutor = VirtualThreadsRecorder.getCurrent();
        return QuarkusWorkerPoolRegistry.uniOnMessageContext(uni, msgContext, vtExecutor).onItemOrFailure().transformToUni((item, failure) -> Uni.createFrom().emitter(emitter -> {
            if (msgContext != null) {
                if (failure != null) {
                    msgContext.runOnContext(() -> emitter.fail(failure));
                } else {
                    msgContext.runOnContext(() -> emitter.complete(item));
                }
            } else if (failure != null) {
                emitter.fail(failure);
            } else {
                emitter.complete(item);
            }
        }));
    }

    private static <T> Uni<T> uniOnMessageContext(Uni<T> uni, io.vertx.mutiny.core.Context msgContext, ExecutorService vtExecutor) {
        return msgContext != Vertx.currentContext() ? uni.runSubscriptionOn(r -> vtExecutor.execute(new ContextPreservingRunnable(r, msgContext))) : uni.runSubscriptionOn((Executor)vtExecutor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public WorkerExecutor getWorker(String workerName) {
        Objects.requireNonNull(workerName, "Worker Name not specified");
        if (this.workerExecutors.containsKey(workerName)) {
            return this.workerExecutors.get(workerName);
        }
        if (this.workerConcurrency.containsKey(workerName)) {
            WorkerExecutor executor = this.workerExecutors.get(workerName);
            if (executor == null) {
                QuarkusWorkerPoolRegistry quarkusWorkerPoolRegistry = this;
                synchronized (quarkusWorkerPoolRegistry) {
                    executor = this.workerExecutors.get(workerName);
                    if (executor == null) {
                        executor = this.executionHolder.vertx().createSharedWorkerExecutor(workerName, this.workerConcurrency.get(workerName).intValue());
                        log.info((Object)("Created worker pool named " + workerName + " with concurrency of " + String.valueOf(this.workerConcurrency.get(workerName))));
                        this.workerExecutors.put(workerName, executor);
                    }
                }
            }
            if (executor != null) {
                return executor;
            }
            throw new RuntimeException("Failed to create Worker for " + workerName);
        }
        throw new IllegalArgumentException("@Blocking referred to invalid worker name. " + workerName);
    }

    public void defineWorker(String className, String method, String poolName, boolean virtualThread) {
        Objects.requireNonNull(className, "className was empty");
        Objects.requireNonNull(method, "Method was empty");
        if (virtualThread) {
            this.virtualThreadWorkers.add(poolName);
            return;
        }
        if (!poolName.equals("<no-value>")) {
            if (Validation.isBlank((String)poolName)) {
                throw this.getBlockingError(className, method, "value is blank or null");
            }
            String workerConfigKey = "smallrye.messaging.worker." + poolName + ".max-concurrency";
            Optional concurrency = ConfigProvider.getConfig().getOptionalValue(workerConfigKey, Integer.class);
            if (concurrency.isEmpty()) {
                throw this.getBlockingError(className, method, workerConfigKey + " was not defined");
            }
            this.workerConcurrency.put(poolName, (Integer)concurrency.get());
        }
    }

    private IllegalArgumentException getBlockingError(String className, String method, String message) {
        return new IllegalArgumentException("Invalid method annotated with @Blocking: " + className + "#" + method + " - " + message);
    }

    private static final class ContextPreservingRunnable
    implements Runnable {
        private final Runnable task;
        private final Context context;

        public ContextPreservingRunnable(Runnable task, io.vertx.mutiny.core.Context context) {
            this.task = task;
            this.context = context.getDelegate();
        }

        @Override
        public void run() {
            if (this.context instanceof ContextInternal) {
                ContextInternal contextInternal = (ContextInternal)this.context;
                ContextInternal previousContext = contextInternal.beginDispatch();
                try {
                    this.task.run();
                }
                finally {
                    contextInternal.endDispatch(previousContext);
                }
            } else {
                this.task.run();
            }
        }
    }
}

