package nanomsg.async.impl;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import nanomsg.Nanomsg;
import nanomsg.Socket;
import nanomsg.async.AsyncOperation;
import nanomsg.async.IAsyncRunnable;
import nanomsg.async.IAsyncScheduler;
import nanomsg.exceptions.IOException;

/* loaded from: input_file:nanomsg/async/impl/ThreadPoolScheduler.class */
public class ThreadPoolScheduler implements Runnable, IAsyncScheduler {
    private final LinkedBlockingQueue<IAsyncRunnable> queue = new LinkedBlockingQueue<>();
    private final int concurrency = Runtime.getRuntime().availableProcessors();
    private final AtomicBoolean started = new AtomicBoolean(false);
    private static ThreadPoolScheduler instance = null;

    public static synchronized ThreadPoolScheduler getInstance() {
        if (instance == null) {
            instance = new ThreadPoolScheduler();
        }
        return instance;
    }

    private void startThreadGroup() {
        ThreadGroup threadGroup = new ThreadGroup("nanomsg-scheduler");
        threadGroup.setDaemon(false);
        for (int i = 0; i < this.concurrency; i++) {
            Thread thread = new Thread(threadGroup, this);
            thread.setDaemon(false);
            thread.start();
        }
    }

    @Override // nanomsg.async.IAsyncScheduler
    public void schedule(Socket socket, AsyncOperation asyncOperation, IAsyncRunnable iAsyncRunnable) throws InterruptedException {
        if (this.started.compareAndSet(false, true)) {
            startThreadGroup();
        }
        this.queue.put(iAsyncRunnable);
    }

    public String toString() {
        return "pending=" + this.queue.size() + ", pool started:" + this.started.get();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!Thread.interrupted()) {
            try {
                IAsyncRunnable take = this.queue.take();
                try {
                    take.run();
                } catch (IOException e) {
                    if (e.getErrno() == Nanomsg.constants.EAGAIN) {
                        this.queue.put(take);
                    }
                }
            } catch (InterruptedException e2) {
                return;
            }
        }
    }
}
