/*
 * Decompiled with CFR 0.152.
 */
package org.newsclub.net.unix;

import com.kohlschutter.annotations.compiletime.SuppressFBWarnings;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.newsclub.net.unix.AFPipe;
import org.newsclub.net.unix.AFUNIXSelectorProvider;

/*
 * Multiple versions of this class in jar - see https://www.benf.org/other/cfr/multi-version-jar.html
 */
@SuppressFBWarnings(value={"THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION"})
public final class PipeTest {
    private final ExecutorService exc = Executors.newCachedThreadPool();

    @Test
    public void testPipe() throws IOException {
        ByteBuffer out = ByteBuffer.allocate(4);
        out.putInt(67305985);
        out.flip();
        ByteBuffer in = ByteBuffer.allocate(4);
        AFUNIXSelectorProvider provider = AFUNIXSelectorProvider.provider();
        AFPipe pipe = provider.openPipe();
        try (AFPipe.SinkChannel sink = pipe.sink();
             AFPipe.SourceChannel source = pipe.source();){
            int nRead;
            sink.write(out);
            while ((nRead = source.read(in)) == 0) {
            }
            in.flip();
            Assertions.assertEquals((int)67305985, (int)in.getInt());
        }
    }

    @Test
    public void testPipeRecvHang() throws IOException, InterruptedException, ExecutionException, TimeoutException {
        long endTime;
        long startTime = System.currentTimeMillis();
        long breaktime = startTime + 1000L;
        int pass = 0;
        while ((endTime = System.currentTimeMillis()) <= breaktime) {
            AFPipe pipe = AFUNIXSelectorProvider.provider().openPipe();
            Future<Long> writer = this.exc.submit(() -> {
                try (Pipe.SinkChannel sink = pipe.sink();){
                    long written = 0L;
                    for (int length = 65536; length > 0; length >>= 1) {
                        ByteBuffer buf = ByteBuffer.allocate(length);
                        written += (long)sink.write(buf);
                    }
                    Long l = written;
                    return l;
                }
            });
            Future<Long> reader = this.exc.submit(() -> {
                try (Pipe.SourceChannel source = pipe.source();){
                    long numBytes;
                    ByteBuffer buf = ByteBuffer.allocate(65536);
                    long read = 0L;
                    while ((numBytes = (long)source.read(buf)) != -1L) {
                        read += numBytes;
                        buf.clear();
                    }
                    Long l = read;
                    return l;
                }
            });
            long written = writer.get();
            long read = reader.get(1L, TimeUnit.SECONDS);
            Assertions.assertEquals((long)written, (long)read);
            ++pass;
        }
        long duration = endTime - startTime;
        float passesPerMsec = (float)pass / (float)duration;
        Assertions.assertNotEquals((float)0.0f, (float)passesPerMsec);
    }
}

