package nanomsg.async.impl;

import com.sun.jna.Memory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import nanomsg.Nanomsg;
import nanomsg.Socket;
import nanomsg.async.AsyncOperation;
import nanomsg.async.IAsyncRunnable;
import nanomsg.async.IAsyncScheduler;
import nanomsg.async.impl.epoll.Epoll;
import nanomsg.exceptions.IOException;

/* loaded from: input_file:nanomsg/async/impl/EPollScheduler.class */
public class EPollScheduler implements Runnable, IAsyncScheduler {
    private final Map<Integer, IAsyncRunnable> runnableMap = new ConcurrentHashMap();
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final int epollFd = Epoll.epoll_create1(Epoll.EPOLL_CLOEXEC);
    private static EPollScheduler instance = null;

    public static synchronized EPollScheduler getInstance() {
        if (instance == null) {
            instance = new EPollScheduler();
        }
        return instance;
    }

    public void register(int i, int i2, IAsyncRunnable iAsyncRunnable) {
        this.runnableMap.put(Integer.valueOf(i), iAsyncRunnable);
        Epoll.epoll_ctl(this.epollFd, 1, i, new Epoll.EpollEvent.ByReference(i, i2));
        System.out.println("registerOnce: fd:" + i);
        System.out.println("registerOnce: total:" + this.runnableMap.size());
    }

    public void registerRead(Socket socket, IAsyncRunnable iAsyncRunnable) {
        register(socket.getRcvFd(), Epoll.EPOLLIN | Epoll.EPOLLONESHOT, iAsyncRunnable);
    }

    public void registerWrite(Socket socket, IAsyncRunnable iAsyncRunnable) {
        register(socket.getSndFd(), Epoll.EPOLLOUT | Epoll.EPOLLONESHOT, iAsyncRunnable);
    }

    @Override // nanomsg.async.IAsyncScheduler
    public void schedule(Socket socket, AsyncOperation asyncOperation, IAsyncRunnable iAsyncRunnable) throws InterruptedException {
        if (this.started.compareAndSet(false, true)) {
            Thread thread = new Thread(this, "nanomsg-poll-scheduler");
            thread.setDaemon(true);
            thread.start();
        }
        if (this.epollFd < 0) {
            throw new RuntimeException("Failed intialize epoll instance.");
        }
        if (asyncOperation == AsyncOperation.READ) {
            registerRead(socket, iAsyncRunnable);
        } else {
            if (asyncOperation != AsyncOperation.WRITE) {
                throw new RuntimeException("Operation not supported.");
            }
            registerWrite(socket, iAsyncRunnable);
        }
    }

    private void processFd(Epoll.EpollEvent.ByReference byReference) {
        int i = byReference.data.fd;
        if (this.runnableMap.containsKey(Integer.valueOf(byReference.data.fd))) {
            IAsyncRunnable iAsyncRunnable = this.runnableMap.get(Integer.valueOf(i));
            this.runnableMap.remove(Integer.valueOf(byReference.data.fd));
            try {
                iAsyncRunnable.run();
            } catch (IOException e) {
                int errno = e.getErrno();
                if (errno != Nanomsg.constants.EAGAIN) {
                    System.out.println("Error on runing the async runnable: " + errno);
                } else {
                    System.out.println("EAGAIN error for fd=" + i);
                    register(i, byReference.events, iAsyncRunnable);
                }
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        System.out.println("Starting event loop.");
        int size = new Epoll.EpollEvent().size();
        Memory memory = new Memory(1024 * size);
        Epoll.EpollEvent.ByReference byReference = new Epoll.EpollEvent.ByReference();
        while (!Thread.interrupted()) {
            int epoll_wait = Epoll.epoll_wait(this.epollFd, memory, 1024, 1000);
            System.out.println("Epoll loop: found " + epoll_wait + " events.");
            if (epoll_wait > 0) {
                for (int i = 0; i < epoll_wait; i++) {
                    byReference.reuse(memory, size * i);
                    processFd(byReference);
                }
                if (this.runnableMap.size() == 0) {
                    System.out.println("Stoping event loop.");
                    this.started.compareAndSet(true, false);
                    return;
                }
            }
        }
    }
}
