/*
 * Decompiled with CFR 0.152.
 */
package org.newsclub.net.unix;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ProtocolFamily;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.StandardProtocolFamily;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.newsclub.net.unix.AFUNIXServerSocket;
import org.newsclub.net.unix.AFUNIXSocket;
import org.newsclub.net.unix.AvailabilityRequirement;
import org.newsclub.net.unix.SocketTestBase;
import org.newsclub.net.unix.TestUtils;

/*
 * Multiple versions of this class in jar - see https://www.benf.org/other/cfr/multi-version-jar.html
 */
public class ThroughputTest
extends SocketTestBase {
    private static final int ENABLED = TestUtils.getIntSystemProperty("org.newsclub.net.unix.throughput-test.enabled", 1);
    private static final int PAYLOAD_SIZE = TestUtils.getIntSystemProperty("org.newsclub.net.unix.throughput-test.payload-size", 8192);
    private static final int NUM_SECONDS = TestUtils.getIntSystemProperty("org.newsclub.net.unix.throughput-test.seconds", 1);
    private final Random random = new Random();

    private static byte[] createTestData(int size) {
        byte[] buf = new byte[size];
        for (int i = 0; i < buf.length; ++i) {
            buf[i] = (byte)(i % 256);
        }
        return buf;
    }

    @Test
    public void testAFUnixSocket() throws Exception {
        Assumptions.assumeTrue((ENABLED > 0 ? 1 : 0) != 0, (String)"Throughput tests are disabled");
        Assumptions.assumeTrue((PAYLOAD_SIZE > 0 ? 1 : 0) != 0, (String)"Payload must be positive");
        new SocketTestBase.ServerThread(){

            @Override
            protected void handleConnection(Socket sock) throws IOException {
                byte[] buf = new byte[PAYLOAD_SIZE];
                try (InputStream inputStream = sock.getInputStream();
                     OutputStream outputStream = sock.getOutputStream();){
                    int read;
                    while ((read = inputStream.read(buf)) >= 0) {
                        outputStream.write(buf, 0, read);
                    }
                }
                this.stopAcceptingConnections();
            }
        };
        AtomicBoolean keepRunning = new AtomicBoolean(true);
        Executors.newSingleThreadScheduledExecutor().schedule(() -> keepRunning.set(false), (long)NUM_SECONDS, TimeUnit.SECONDS);
        try (AFUNIXSocket sock = this.connectToServer();){
            byte[] buf = ThroughputTest.createTestData(PAYLOAD_SIZE);
            try (InputStream inputStream = sock.getInputStream();
                 OutputStream outputStream = sock.getOutputStream();){
                long readTotal = 0L;
                long time = System.currentTimeMillis();
                while (keepRunning.get()) {
                    int read;
                    outputStream.write(buf);
                    int remaining = buf.length;
                    int offset = 0;
                    while (remaining > 0 && (read = inputStream.read(buf, offset, remaining)) >= 0) {
                        int pos = this.random.nextInt(read) + offset;
                        if ((buf[pos] & 0xFF) != pos % 256) {
                            throw new IllegalStateException("Unexpected response from read: value@pos " + pos + "=" + (buf[pos] & 0xFF) + " != " + pos % 256);
                        }
                        remaining -= read;
                        offset += read;
                        readTotal += (long)read;
                    }
                }
                time = System.currentTimeMillis() - time;
                System.out.println("ThroughputTest (junixsocket byte[]): " + 1000.0f * (float)readTotal / (float)time / 1000.0f / 1000.0f + " MB/s for payload size " + PAYLOAD_SIZE);
            }
        }
    }

    @Test
    @AvailabilityRequirement(classes={"java.net.UnixDomainSocketAddress"}, message="This test requires Java 16 or later")
    public void testJEP380() throws Exception {
        Assumptions.assumeTrue((ENABLED > 0 ? 1 : 0) != 0, (String)"Throughput tests are disabled");
        Assumptions.assumeTrue((PAYLOAD_SIZE > 0 ? 1 : 0) != 0, (String)"Payload must be positive");
        this.runTestJEP380(false);
    }

    @Test
    @AvailabilityRequirement(classes={"java.net.UnixDomainSocketAddress"}, message="This test requires Java 16 or later")
    public void testJEP380directBuffer() throws Exception {
        Assumptions.assumeTrue((ENABLED > 0 ? 1 : 0) != 0, (String)"Throughput tests are disabled");
        Assumptions.assumeTrue((PAYLOAD_SIZE > 0 ? 1 : 0) != 0, (String)"Payload must be positive");
        this.runTestJEP380(true);
    }

    private void runTestJEP380(final boolean direct) throws Exception {
        SocketAddress sa;
        try {
            Class<?> klazz = Class.forName("java.net.UnixDomainSocketAddress");
            sa = (SocketAddress)klazz.getMethod("of", String.class).invoke(null, this.getServerAddress().getPath());
        }
        catch (ClassNotFoundException e) {
            Assumptions.assumeTrue((boolean)false, (String)"java.net.UnixDomainSocketAddress (JEP 380) not supported by JVM");
            return;
        }
        new SocketTestBase.ServerThread(){
            ServerSocketChannel ssc;

            @Override
            protected AFUNIXServerSocket startServer() throws IOException {
                try {
                    this.ssc = (ServerSocketChannel)ServerSocketChannel.class.getMethod("open", ProtocolFamily.class).invoke(null, StandardProtocolFamily.valueOf("UNIX"));
                }
                catch (Exception e) {
                    throw new IllegalStateException(e);
                }
                this.ssc.bind(sa);
                return null;
            }

            @Override
            public void shutdown() throws IOException {
                super.shutdown();
                this.ssc.close();
            }

            @Override
            protected void acceptAndHandleConnection() throws IOException {
                ByteBuffer bb = direct ? ByteBuffer.allocateDirect(PAYLOAD_SIZE) : ByteBuffer.allocate(PAYLOAD_SIZE);
                try (SocketChannel sc = this.ssc.accept();){
                    while (sc.read(bb) >= 0) {
                        bb.flip();
                        sc.write(bb);
                        bb.flip();
                    }
                }
            }

            @Override
            protected void handleConnection(Socket sock) throws IOException {
                throw new IllegalStateException();
            }
        };
        AtomicBoolean keepRunning = new AtomicBoolean(true);
        Executors.newSingleThreadScheduledExecutor().schedule(() -> keepRunning.set(false), (long)NUM_SECONDS, TimeUnit.SECONDS);
        try (SocketChannel sc = SocketChannel.open(sa);){
            ByteBuffer bb = direct ? ByteBuffer.allocateDirect(PAYLOAD_SIZE) : ByteBuffer.allocate(PAYLOAD_SIZE);
            bb.put(ThroughputTest.createTestData(PAYLOAD_SIZE));
            bb.flip();
            long readTotal = 0L;
            long time = System.currentTimeMillis();
            while (keepRunning.get()) {
                long read;
                int remaining = sc.write(bb);
                bb.flip();
                while (remaining > 0 && (read = (long)sc.read(bb)) >= 0L) {
                    remaining = (int)((long)remaining - read);
                    readTotal += read;
                }
                int pos = this.random.nextInt(bb.limit());
                if ((bb.get(pos) & 0xFF) == pos % 256) continue;
                throw new IllegalStateException("Unexpected response from read");
            }
            time = System.currentTimeMillis() - time;
            System.out.println("ThroughputTest (JEP380 direct=" + direct + "): " + 1000.0f * (float)readTotal / (float)time / 1000.0f / 1000.0f + " MB/s for payload size " + PAYLOAD_SIZE);
        }
    }
}

