/*
 * Decompiled with CFR 0.152.
 */
package com.github.jlangch.venice.util.ipc;

import com.github.jlangch.venice.VncException;
import com.github.jlangch.venice.impl.threadpool.ManagedCachedThreadPoolExecutor;
import com.github.jlangch.venice.impl.util.StringUtil;
import com.github.jlangch.venice.util.ipc.IMessage;
import com.github.jlangch.venice.util.ipc.impl.Message;
import com.github.jlangch.venice.util.ipc.impl.ServerStatistics;
import com.github.jlangch.venice.util.ipc.impl.Subscriptions;
import com.github.jlangch.venice.util.ipc.impl.TcpServerConnection;
import com.github.jlangch.venice.util.ipc.impl.util.Compressor;
import java.io.Closeable;
import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

public class TcpServer
implements Closeable {
    public static final long MESSAGE_LIMIT_MIN = 2048L;
    public static final long MESSAGE_LIMIT_MAX = 0xC800000L;
    private final int port;
    private final int timeout;
    private final String endpointId;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final AtomicReference<ServerSocketChannel> server = new AtomicReference();
    private final AtomicLong maxMessageSize = new AtomicLong(0xC800000L);
    private final int publishQueueCapacity = 50;
    private final ServerStatistics statistics = new ServerStatistics();
    private final Subscriptions subscriptions = new Subscriptions();
    private final Map<String, LinkedBlockingQueue<Message>> p2pQueues = new HashMap<String, LinkedBlockingQueue<Message>>();
    private final AtomicReference<Compressor> compressor = new AtomicReference<Compressor>(Compressor.off());
    private final ManagedCachedThreadPoolExecutor mngdExecutor = new ManagedCachedThreadPoolExecutor("venice-tcpserver-pool", 20);

    public TcpServer(int port) {
        this(port, 0);
    }

    public TcpServer(int port, int timeout) {
        this.port = port;
        this.timeout = Math.max(0, timeout);
        this.endpointId = UUID.randomUUID().toString();
    }

    public TcpServer setMaximumParallelConnections(int count) {
        this.mngdExecutor.setMaximumThreadPoolSize(Math.max(1, count));
        return this;
    }

    public TcpServer setCompressCutoffSize(long cutoffSize) {
        if (this.started.get()) {
            throw new VncException("The compression cutoff size cannot be set anymore once the server has been started!");
        }
        this.compressor.set(new Compressor(cutoffSize));
        return this;
    }

    public long getCompressCutoffSize() {
        return this.compressor.get().cutoffSize();
    }

    public TcpServer setMaximumMessageSize(long maxSize) {
        this.maxMessageSize.set(Math.max(2048L, Math.min(0xC800000L, maxSize)));
        return this;
    }

    public long getMaximumMessageSize() {
        return this.maxMessageSize.get();
    }

    public String getEndpointId() {
        return this.endpointId;
    }

    public ServerStatistics getStatistics() {
        return this.statistics;
    }

    public void clearStatistics() {
        this.statistics.clear();
    }

    public void start(Function<IMessage, IMessage> handler) {
        Objects.requireNonNull(handler);
        if (this.started.compareAndSet(false, true)) {
            ServerSocketChannel ch = this.startServer();
            try {
                ThreadPoolExecutor executor = this.mngdExecutor.getExecutor();
                ch.configureBlocking(true);
                executor.execute(() -> {
                    while (this.started.get()) {
                        try {
                            SocketChannel channel = ch.accept();
                            channel.configureBlocking(true);
                            TcpServerConnection conn = new TcpServerConnection(this, channel, handler, this.maxMessageSize, this.subscriptions, 50, this.p2pQueues, this.compressor.get(), this.statistics, () -> this.mngdExecutor.info());
                            executor.execute(conn);
                        }
                        catch (IOException ignored) {
                            return;
                        }
                    }
                });
            }
            catch (Exception ex) {
                this.safeClose(ch);
                this.started.set(false);
                this.server.set(null);
                throw new VncException("Closed TcpServer @ 127.0.0.1 on port " + this.port + "!", ex);
            }
        } else {
            throw new VncException("The TcpServer @ 127.0.0.1 on port " + this.port + " has already been started!");
        }
    }

    @Override
    public void close() throws IOException {
        if (this.started.compareAndSet(true, false)) {
            this.safeClose(this.server.get());
            this.server.set(null);
            this.mngdExecutor.shutdownNow();
        }
    }

    public boolean isRunning() {
        ServerSocketChannel ch = this.server.get();
        return ch != null && ch.isOpen();
    }

    public static Function<IMessage, IMessage> echoHandler() {
        return req -> req;
    }

    public void createQueue(String queueName, int capacity) {
        Objects.requireNonNull(queueName);
        if (StringUtil.isBlank(queueName)) {
            throw new IllegalArgumentException("A queue name must not be blank");
        }
        if (capacity < 1) {
            throw new IllegalArgumentException("A queue capacity must not be lower than 1");
        }
        if (!this.p2pQueues.containsKey(queueName)) {
            this.p2pQueues.put(queueName, new LinkedBlockingQueue(capacity));
        }
    }

    public void removeQueue(String queueName) {
        Objects.requireNonNull(queueName);
        this.p2pQueues.remove(queueName);
    }

    public boolean existsQueue(String queueName) {
        Objects.requireNonNull(queueName);
        return this.p2pQueues.containsKey(queueName);
    }

    private void safeClose(ServerSocketChannel ch) {
        if (ch != null) {
            try {
                ch.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }

    private ServerSocketChannel startServer() {
        ServerSocketChannel srv = null;
        try {
            srv = ServerSocketChannel.open();
            srv.bind(new InetSocketAddress("127.0.0.1", this.port));
            if (this.timeout > 0) {
                srv.socket().setSoTimeout(this.timeout);
            }
            this.server.set(srv);
            return srv;
        }
        catch (BindException ex) {
            this.safeClose(srv);
            this.started.set(false);
            this.server.set(null);
            throw new VncException("Failed to start TcpServer @ 127.0.0.1 on port " + this.port + "! " + ex.getMessage(), ex);
        }
        catch (Exception ex) {
            this.safeClose(srv);
            this.started.set(false);
            this.server.set(null);
            throw new VncException("Failed to start TcpServer @ 127.0.0.1 on port " + this.port + "!", ex);
        }
    }
}

