/*
 * Decompiled with CFR 0.152.
 */
package org.newsclub.net.unix.domain;

import com.kohlschutter.annotations.compiletime.SuppressFBWarnings;
import com.kohlschutter.testutil.TestAbortedNotAnIssueException;
import com.kohlschutter.util.SystemPropertyUtil;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.newsclub.net.unix.AFInputStream;
import org.newsclub.net.unix.AFOutputStream;
import org.newsclub.net.unix.AFSocketCapability;
import org.newsclub.net.unix.AFSocketCapabilityRequirement;
import org.newsclub.net.unix.AFUNIXServerSocket;
import org.newsclub.net.unix.AFUNIXSocket;
import org.newsclub.net.unix.AFUNIXSocketAddress;
import org.newsclub.net.unix.AFUNIXSocketChannel;
import org.newsclub.net.unix.ConnectionResetSocketException;
import org.newsclub.net.unix.SocketClosedException;
import org.newsclub.net.unix.ThreadUtil;
import org.newsclub.net.unix.domain.AFUNIXAddressSpecifics;

/*
 * Multiple versions of this class in jar - see https://www.benf.org/other/cfr/multi-version-jar.html
 */
@AFSocketCapabilityRequirement(value={AFSocketCapability.CAPABILITY_UNIX_DOMAIN})
@SuppressFBWarnings(value={"NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"})
public class MassiveParallelTest
extends org.newsclub.net.unix.MassiveParallelTest<AFUNIXSocketAddress> {
    private static final int MAX_SERVER_THREADS = 32;

    protected MassiveParallelTest() {
        super(AFUNIXAddressSpecifics.INSTANCE);
    }

    @Test
    public void testAcceptConnect() throws Exception {
        if (!ThreadUtil.isVirtualThreadSupported()) {
            throw new TestAbortedNotAnIssueException("Virtual Threads are not supported by this JVM");
        }
        int numConnections = SystemPropertyUtil.getIntSystemProperty((String)"selftest.MassiveParallelTest.numConnections", (int)1000);
        if (numConnections <= 0) {
            throw new TestAbortedNotAnIssueException("Skipping test due to numConnections=" + numConnections);
        }
        int nProc = Math.min(32, Runtime.getRuntime().availableProcessors());
        final Semaphore concurrentClientPermits = new Semaphore(Math.min(100, nProc));
        final AFUNIXSocketAddress listenAddr = AFUNIXSocketAddress.ofNewTempFile();
        try (final Server server = new Server(listenAddr, nProc, numConnections);){
            final ExecutorService esClients = ThreadUtil.newVirtualThreadPerTaskExecutor();
            final AtomicInteger connectAttempts = new AtomicInteger(0);
            final AtomicInteger connected = new AtomicInteger(0);
            Runnable clientJob = new Runnable(){
                final /* synthetic */ MassiveParallelTest this$0;
                {
                    this.this$0 = this$0;
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        AFUNIXSocket socket = AFUNIXSocket.newInstance();
                        try {
                            concurrentClientPermits.acquire();
                            try {
                                connectAttempts.incrementAndGet();
                                socket.connect((SocketAddress)listenAddr);
                            }
                            catch (SocketException | SocketTimeoutException e) {
                                if (socket != null) {
                                    socket.close();
                                }
                                concurrentClientPermits.release();
                                if (server.isRunning()) {
                                    esClients.submit(this);
                                }
                                return;
                            }
                            connected.incrementAndGet();
                            try (AFInputStream in = socket.getInputStream();
                                 AFOutputStream out = socket.getOutputStream();){
                                int read;
                                int rcv = in.read();
                                if (rcv != 170 && server.isRunning()) {
                                    System.err.println("Wrong data: " + rcv);
                                }
                                byte[] otherBytes = new byte[5];
                                int pos = 0;
                                int remaining = otherBytes.length;
                                while (remaining > 0 && (read = in.read(otherBytes, pos, remaining)) != -1) {
                                    remaining -= read;
                                    pos += read;
                                }
                                if (remaining != 0 && server.isRunning()) {
                                    System.err.println("Incomplete data; bytes missing " + remaining);
                                }
                                if (server.isRunning() && !Arrays.equals(new byte[]{-85, -84, 88, 89, 90}, otherBytes)) {
                                    System.err.println("Wrong data received: " + Arrays.toString(otherBytes) + " vs " + Arrays.toString(new byte[]{-85, -84, 88, 89, 90}));
                                }
                                out.write(187);
                                out.flush();
                            }
                        }
                        finally {
                            if (socket != null) {
                                try {
                                    socket.close();
                                }
                                catch (Throwable throwable) {
                                    Throwable throwable2;
                                    throwable2.addSuppressed(throwable);
                                }
                            }
                        }
                    }
                    catch (Throwable e) {
                        if (server.isRunning()) {
                            e.printStackTrace();
                        }
                    }
                    finally {
                        concurrentClientPermits.release();
                        if (server.isRunning()) {
                            esClients.submit(this);
                        }
                    }
                }
            };
            long startTime = System.currentTimeMillis();
            for (int i = 0; i < numConnections; ++i) {
                esClients.submit(clientJob);
            }
            boolean stopped = server.cl.await(10L, TimeUnit.SECONDS);
            if (!stopped) {
                List<Runnable> remainingClients = esClients.shutdownNow();
                List<Runnable> remainingServers = server.esServers.shutdownNow();
                System.err.println("Not all connections were made; remaining: " + server.cl.getCount());
                if (!remainingClients.isEmpty() || !remainingServers.isEmpty()) {
                    System.err.println("Remaining threads: servers=" + remainingServers.size() + "; clients=" + remainingClients.size());
                }
            }
            server.stop();
            long elapsed = System.currentTimeMillis() - startTime;
            int completed = server.completed.intValue();
            System.out.println("millis: " + elapsed);
            System.out.println("completed: " + completed);
            float timePerItem = (float)elapsed / (float)completed;
            System.out.println("time per completed connection: " + timePerItem + " ms");
            if (completed <= nProc && (double)completed < (double)numConnections / 10.0) {
                Assertions.fail((String)("Not enough jobs were completed: " + completed + "; expected:" + numConnections));
            }
        }
    }

    /*
     * Multiple versions of this class in jar - see https://www.benf.org/other/cfr/multi-version-jar.html
     */
    private static final class Server
    implements Closeable {
        final AtomicBoolean running = new AtomicBoolean(true);
        final AtomicInteger accepted = new AtomicInteger(0);
        final AtomicInteger serverThreads = new AtomicInteger(0);
        final AtomicInteger completed = new AtomicInteger(0);
        final AtomicInteger acceptsInFlight = new AtomicInteger(0);
        final AFUNIXServerSocket serverSocket;
        final ExecutorService esServers;
        final CountDownLatch cl;

        public Server(AFUNIXSocketAddress listenAddr, int numServerThreads, int stopAfterNumConnections) throws IOException {
            this.cl = new CountDownLatch(stopAfterNumConnections);
            this.serverSocket = AFUNIXServerSocket.newInstance();
            this.serverSocket.bind((SocketAddress)listenAddr);
            this.esServers = ThreadUtil.newVirtualThreadPerTaskExecutor();
            for (int i = 0; i < numServerThreads; ++i) {
                this.esServers.submit(this::acceptJob);
            }
        }

        public boolean isRunning() {
            return this.running.get() && !this.serverSocket.isClosed();
        }

        public String toString() {
            return super.toString() + "[closed=" + this.serverSocket.isClosed() + ";running=" + this.running.get() + ";inAccept=" + String.valueOf(this.acceptsInFlight) + ";completed=" + String.valueOf(this.completed) + "]";
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        @SuppressFBWarnings(value={"NP_LOAD_OF_KNOWN_NULL_VALUE", "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"})
        private void acceptJob() {
            this.serverThreads.incrementAndGet();
            this.acceptsInFlight.incrementAndGet();
            try {
                ByteBuffer bbNonDirect = ByteBuffer.allocate(64);
                ByteBuffer bbDirect = ByteBuffer.allocateDirect(64);
                while (this.isRunning()) {
                    try {
                        AFUNIXSocket socket = this.serverSocket.accept();
                        try {
                            this.accepted.incrementAndGet();
                            if (socket == null) continue;
                            this.cl.countDown();
                            if (this.cl.getCount() == 0L) {
                                this.stop();
                            }
                            AFUNIXSocketChannel channel = socket.getChannel();
                            try {
                                AFOutputStream out = socket.getOutputStream();
                                try {
                                    channel.configureBlocking(true);
                                    ByteBuffer bb = bbDirect;
                                    bb.clear();
                                    bb.put((byte)-86);
                                    bb.put((byte)-85);
                                    bb.put((byte)-84);
                                    bb.flip();
                                    while (bb.hasRemaining()) {
                                        channel.write(bb);
                                    }
                                    out.write(new byte[]{88, 89, 90});
                                    out.flush();
                                    bb = bbNonDirect;
                                    bb.clear();
                                    int read = channel.read(bb);
                                    if (read != 1) {
                                        if (!this.isRunning()) continue;
                                        System.err.println("Wrong response: " + read + " bytes");
                                        continue;
                                    }
                                    int rcv = bb.get(0) & 0xFF;
                                    if (rcv != 187) {
                                        if (!this.isRunning()) continue;
                                        System.err.println("Wrong response: 0x" + Integer.toHexString(rcv));
                                        continue;
                                    }
                                    this.completed.incrementAndGet();
                                }
                                finally {
                                    if (out == null) continue;
                                    out.close();
                                }
                            }
                            finally {
                                if (channel == null) continue;
                                channel.close();
                            }
                        }
                        finally {
                            if (socket == null) continue;
                            socket.close();
                        }
                    }
                    catch (InterruptedIOException | ClosedChannelException | ConnectionResetSocketException | SocketClosedException socket) {
                    }
                    catch (SocketException e) {
                        if (!this.isRunning()) continue;
                        throw e;
                        return;
                    }
                }
            }
            catch (Throwable e) {
                if (!this.isRunning()) return;
                e.printStackTrace();
                return;
            }
            finally {
                this.acceptsInFlight.decrementAndGet();
                if (this.isRunning()) {
                    System.err.println("Restarting failed server job");
                    try {
                        this.esServers.submit(this::acceptJob);
                    }
                    catch (RejectedExecutionException e) {
                        if (this.isRunning()) {
                            e.printStackTrace();
                        }
                    }
                    catch (Throwable t) {
                        t.printStackTrace();
                    }
                }
            }
        }

        public void stop() {
            this.running.set(false);
            this.esServers.shutdown();
        }

        @Override
        public void close() throws IOException {
            this.stop();
            this.serverSocket.close();
            try {
                if (!this.esServers.awaitTermination(5L, TimeUnit.SECONDS)) {
                    this.esServers.shutdownNow();
                    if (!this.esServers.awaitTermination(5L, TimeUnit.SECONDS)) {
                        throw new InterruptedIOException("did not terminate");
                    }
                }
            }
            catch (InterruptedException e) {
                throw (InterruptedIOException)new InterruptedIOException("did not terminate").initCause(e);
            }
        }
    }
}

