/*
 * Decompiled with CFR 0.152.
 */
package com.github.jlangch.venice.util.ipc;

import com.github.jlangch.venice.InterruptedException;
import com.github.jlangch.venice.TimeoutException;
import com.github.jlangch.venice.VncException;
import com.github.jlangch.venice.impl.threadpool.ManagedCachedThreadPoolExecutor;
import com.github.jlangch.venice.impl.types.collections.VncMap;
import com.github.jlangch.venice.impl.util.CollectionUtil;
import com.github.jlangch.venice.impl.util.StringUtil;
import com.github.jlangch.venice.util.dh.DiffieHellmanKeys;
import com.github.jlangch.venice.util.ipc.IMessage;
import com.github.jlangch.venice.util.ipc.MessageType;
import com.github.jlangch.venice.util.ipc.ResponseStatus;
import com.github.jlangch.venice.util.ipc.impl.Message;
import com.github.jlangch.venice.util.ipc.impl.Protocol;
import com.github.jlangch.venice.util.ipc.impl.TcpSubscriptionListener;
import com.github.jlangch.venice.util.ipc.impl.Topics;
import com.github.jlangch.venice.util.ipc.impl.util.Compressor;
import com.github.jlangch.venice.util.ipc.impl.util.Encryptor;
import com.github.jlangch.venice.util.ipc.impl.util.IO;
import com.github.jlangch.venice.util.ipc.impl.util.Json;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

public class TcpClient
implements Cloneable,
Closeable {
    public static final long MESSAGE_LIMIT_MIN = 2048L;
    public static final long MESSAGE_LIMIT_MAX = 0xC800000L;
    private final String host;
    private final int port;
    private final String endpointId;
    private final AtomicBoolean opened = new AtomicBoolean(false);
    private final AtomicReference<SocketChannel> channel = new AtomicReference();
    private final AtomicBoolean subscription = new AtomicBoolean(false);
    private final AtomicLong maxMessageSize = new AtomicLong(0xC800000L);
    private final AtomicLong messageSentCount = new AtomicLong(0L);
    private final AtomicLong messageReceiveCount = new AtomicLong(0L);
    private final AtomicReference<Compressor> compressor = new AtomicReference<Compressor>(Compressor.off());
    private final boolean encrypt;
    private final DiffieHellmanKeys dhKeys;
    private final AtomicReference<Encryptor> encryptor = new AtomicReference<Encryptor>(Encryptor.off());
    private final ManagedCachedThreadPoolExecutor mngdExecutor = new ManagedCachedThreadPoolExecutor("venice-tcpclient-pool", 10);

    public TcpClient(int port) {
        this(null, port, false);
    }

    public TcpClient(String host, int port) {
        this(host, port, false);
    }

    public TcpClient(int port, boolean encrypt) {
        this(null, port, encrypt);
    }

    public TcpClient(String host, int port, boolean encrypt) {
        this.host = StringUtil.isBlank(host) ? "127.0.0.1" : host;
        this.port = port;
        this.encrypt = encrypt;
        this.endpointId = UUID.randomUUID().toString();
        this.dhKeys = DiffieHellmanKeys.create();
    }

    public Object clone() {
        TcpClient client = new TcpClient(this.host, this.port, this.isEncrypted());
        client.setCompressCutoffSize(this.getCompressCutoffSize());
        client.setMaximumMessageSize(this.getMaximumMessageSize());
        return client;
    }

    public boolean isEncrypted() {
        return this.encryptor.get().isActive();
    }

    public TcpClient setCompressCutoffSize(long cutoffSize) {
        if (this.opened.get()) {
            throw new VncException("The compression cutoff size cannot be set anymore once the client has been opened!");
        }
        this.compressor.set(new Compressor(cutoffSize));
        return this;
    }

    public long getCompressCutoffSize() {
        return this.compressor.get().cutoffSize();
    }

    public TcpClient setMaximumMessageSize(long maxSize) {
        this.maxMessageSize.set(Math.max(2048L, Math.min(0xC800000L, maxSize)));
        return this;
    }

    public long getMaximumMessageSize() {
        return this.maxMessageSize.get();
    }

    public long getMessageSendCount() {
        return this.messageSentCount.get();
    }

    public long getMessageReceiveCount() {
        return this.messageReceiveCount.get();
    }

    public String getEndpointId() {
        return this.endpointId;
    }

    public void open() {
        if (this.opened.compareAndSet(false, true)) {
            SocketChannel ch = null;
            try {
                ch = SocketChannel.open(new InetSocketAddress(this.host, this.port));
                this.channel.set(ch);
            }
            catch (Exception ex) {
                IO.safeClose(ch);
                this.opened.set(false);
                this.channel.set(null);
                throw new VncException("Failed to open TcpClient for server " + this.host + "/" + this.port + "!", ex);
            }
            if (this.encrypt) {
                try {
                    this.diffieHellmanKeyExchange();
                }
                catch (Exception ex) {
                    IO.safeClose(ch);
                    this.opened.set(false);
                    this.channel.set(null);
                    throw new VncException("Failed to open TcpClient for server " + this.host + "/" + this.port + "! Diffie-Hellman key exchange error!", ex);
                }
            }
        } else {
            throw new VncException("This TcpClient is already open!");
        }
    }

    public boolean isRunning() {
        SocketChannel ch = this.channel.get();
        return ch != null && ch.isOpen();
    }

    @Override
    public void close() throws IOException {
        if (this.opened.compareAndSet(true, false)) {
            IO.safeClose(this.channel.get());
            this.channel.set(null);
        }
    }

    public IMessage sendMessage(IMessage msg) {
        Objects.requireNonNull(msg);
        Message m = ((Message)msg).withType(MessageType.REQUEST, false);
        return this.send(m);
    }

    public void sendMessageOneway(IMessage msg) {
        Objects.requireNonNull(msg);
        Message m = ((Message)msg).withType(MessageType.REQUEST, true);
        this.send(m);
    }

    public IMessage sendMessage(IMessage msg, long timeout, TimeUnit unit) {
        Objects.requireNonNull(msg);
        Objects.requireNonNull(unit);
        Message m = ((Message)msg).withType(MessageType.REQUEST, false);
        return this.send(m, timeout, unit);
    }

    public IMessage subscribe(String topic, Consumer<IMessage> handler) {
        Objects.requireNonNull(topic);
        Objects.requireNonNull(handler);
        return this.subscribe(CollectionUtil.toSet(topic), handler);
    }

    public IMessage subscribe(Set<String> topics, Consumer<IMessage> handler) {
        Objects.requireNonNull(topics);
        Objects.requireNonNull(handler);
        if (topics.isEmpty()) {
            throw new VncException("A subscription topic set must not be empty!");
        }
        SocketChannel ch = this.channel.get();
        if (ch == null || !ch.isOpen()) {
            throw new VncException("This TcpClient is not open!");
        }
        Message subscribeMsg = TcpClient.createSubscribeRequestMessage(topics, this.endpointId);
        if (this.subscription.compareAndSet(false, true)) {
            try {
                Callable<IMessage> task = () -> {
                    Protocol.sendMessage(ch, subscribeMsg, this.compressor.get(), this.encryptor.get());
                    this.messageSentCount.incrementAndGet();
                    Message response = Protocol.receiveMessage(ch, this.compressor.get(), this.encryptor.get());
                    this.messageReceiveCount.incrementAndGet();
                    return response;
                };
                IMessage response = this.deref(this.mngdExecutor.getExecutor().submit(task), 5L, TimeUnit.SECONDS);
                if (response.getResponseStatus() == ResponseStatus.OK) {
                    this.mngdExecutor.getExecutor().submit(new TcpSubscriptionListener(ch, handler, this.compressor.get(), this.encryptor.get()));
                    return response;
                }
                throw new VncException("Failed to start subscription mode");
            }
            catch (Exception ex) {
                this.subscription.set(false);
                throw ex;
            }
        }
        throw new VncException("The client is already in subscription mode!");
    }

    public IMessage publish(IMessage msg) {
        Objects.requireNonNull(msg);
        this.validateMessageSize(msg);
        Message m = ((Message)msg).withType(MessageType.PUBLISH, false);
        if (this.subscription.get()) {
            return this.sendThroughTemporaryClient(m, 5L, TimeUnit.SECONDS);
        }
        return this.send(m, 5L, TimeUnit.SECONDS);
    }

    public IMessage offer(IMessage msg, String queueName, long timeout, TimeUnit unit) {
        Objects.requireNonNull(msg);
        Objects.requireNonNull(queueName);
        Objects.requireNonNull(unit);
        this.validateMessageSize(msg);
        Message m = TcpClient.createQueueOfferRequestMessage((Message)msg, queueName);
        return this.send(m, timeout, unit);
    }

    public IMessage poll(String queueName, long timeout, TimeUnit unit) {
        Objects.requireNonNull(queueName);
        Objects.requireNonNull(unit);
        Message m = TcpClient.createQueuePollRequestMessage(queueName);
        return this.send(m, timeout, unit);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private IMessage sendThroughTemporaryClient(IMessage msg, long timeout, TimeUnit unit) {
        Objects.requireNonNull(msg);
        Objects.requireNonNull(unit);
        try (TcpClient client = (TcpClient)this.clone();){
            client.open();
            IMessage iMessage = client.send(msg, timeout, unit);
            return iMessage;
        }
        catch (IOException ex) {
            return null;
        }
    }

    private IMessage send(IMessage msg) {
        Objects.requireNonNull(msg);
        this.validateMessageSize(msg);
        if (this.subscription.get()) {
            throw new VncException("A client in subscription mode cannot send request messages!");
        }
        SocketChannel ch = this.channel.get();
        if (ch == null || !ch.isOpen()) {
            throw new VncException("This TcpClient is not open!");
        }
        if (this.isClientLocalMessage(msg)) {
            return this.handleClientLocalMessage(msg);
        }
        Protocol.sendMessage(ch, (Message)msg, this.compressor.get(), this.encryptor.get());
        this.messageSentCount.incrementAndGet();
        if (msg.isOneway()) {
            return null;
        }
        Message response = Protocol.receiveMessage(ch, this.compressor.get(), this.encryptor.get());
        this.messageReceiveCount.incrementAndGet();
        return response;
    }

    private IMessage send(IMessage msg, long timeout, TimeUnit unit) {
        Objects.requireNonNull(msg);
        Objects.requireNonNull(unit);
        this.validateMessageSize(msg);
        if (this.subscription.get()) {
            throw new VncException("A client in subscription mode cannot send request messages!");
        }
        if (this.isClientLocalMessage(msg)) {
            return this.handleClientLocalMessage(msg);
        }
        return this.deref(this.sendAsync(msg), timeout, unit);
    }

    private Future<IMessage> sendAsync(IMessage msg) {
        Objects.requireNonNull(msg);
        this.validateMessageSize(msg);
        if (this.subscription.get()) {
            throw new VncException("A client in subscription mode cannot send request messages!");
        }
        SocketChannel ch = this.channel.get();
        if (ch == null || !ch.isOpen()) {
            throw new VncException("This TcpClient is not open!");
        }
        if (this.isClientLocalMessage(msg)) {
            return this.mngdExecutor.getExecutor().submit(() -> this.handleClientLocalMessage(msg));
        }
        return this.sendAsyncRaw(msg, ch, this.compressor.get(), this.encryptor.get());
    }

    private Future<IMessage> sendAsyncRaw(IMessage msg, SocketChannel ch, Compressor compressor, Encryptor encryptor) {
        Callable<IMessage> task = () -> {
            Protocol.sendMessage(ch, (Message)msg, compressor, encryptor);
            this.messageSentCount.incrementAndGet();
            if (msg.isOneway()) {
                return null;
            }
            Message response = Protocol.receiveMessage(ch, compressor, encryptor);
            this.messageReceiveCount.incrementAndGet();
            return response;
        };
        return this.mngdExecutor.getExecutor().submit(task);
    }

    private void diffieHellmanKeyExchange() {
        SocketChannel ch = this.channel.get();
        if (ch == null || !ch.isOpen()) {
            throw new VncException("This TcpClient is not open!");
        }
        Message m = TcpClient.createDiffieHellmanRequestMessage(this.dhKeys.getPublicKeyBase64());
        Message response = (Message)this.deref(this.sendAsyncRaw(m, ch, Compressor.off(), Encryptor.off()), 2L, TimeUnit.SECONDS);
        if (response.getResponseStatus() != ResponseStatus.DIFFIE_HELLMAN_ACK) {
            if (response.getResponseStatus() == ResponseStatus.DIFFIE_HELLMAN_NAK) {
                String errText = response.getText();
                throw new VncException("Error: The server rejected the Diffie-Hellman key exchange! " + errText);
            }
            throw new VncException("Failed to process Diffie-Hellman key exchange!");
        }
        String serverPublicKey = response.getText();
        this.encryptor.set(Encryptor.aes(this.dhKeys.generateSharedSecret(serverPublicKey)));
    }

    private Message handleClientLocalMessage(IMessage request) {
        if ("client/thread-pool-statistics".equals(request.getTopic())) {
            return this.getClientThreadPoolStatistics();
        }
        return null;
    }

    private boolean isClientLocalMessage(IMessage request) {
        return "client/thread-pool-statistics".equals(request.getTopic());
    }

    private Message getClientThreadPoolStatistics() {
        VncMap statistics = this.mngdExecutor.info();
        return new Message(MessageType.RESPONSE, ResponseStatus.OK, false, Topics.of("client/thread-pool-statistics"), "application/json", "UTF-8", TcpClient.toBytes(Json.writeJson(statistics, false), "UTF-8"));
    }

    private void validateMessageSize(IMessage msg) {
        Objects.requireNonNull(msg);
        if ((long)msg.getData().length > this.maxMessageSize.get()) {
            throw new VncException(String.format("The message (%dB) is too large! The limit is at %dB", msg.getData().length, this.maxMessageSize.get()));
        }
    }

    private IMessage deref(Future<IMessage> future, long timeout, TimeUnit unit) {
        try {
            return future.get(timeout, unit);
        }
        catch (VncException ex) {
            throw ex;
        }
        catch (java.util.concurrent.TimeoutException ex) {
            throw new TimeoutException("Timeout while waiting for IPC response.");
        }
        catch (ExecutionException ex) {
            Throwable cause = ex.getCause();
            if (cause instanceof VncException) {
                throw (VncException)cause;
            }
            throw new VncException("Error in IPC call", cause);
        }
        catch (java.lang.InterruptedException ex) {
            throw new InterruptedException("Interrupted while waiting for IPC response.");
        }
    }

    private static Message createDiffieHellmanRequestMessage(String clientPublicKey) {
        return new Message(MessageType.DIFFIE_HELLMAN_KEY_REQUEST, ResponseStatus.NULL, false, Topics.of("dh"), "text/plain", "UTF-8", TcpClient.toBytes(clientPublicKey, "UTF-8"));
    }

    private static Message createSubscribeRequestMessage(Set<String> topics, String endpointId) {
        return new Message(MessageType.SUBSCRIBE, ResponseStatus.NULL, false, Topics.of(topics), "text/plain", "UTF-8", TcpClient.toBytes(endpointId, "UTF-8"));
    }

    private static Message createQueueOfferRequestMessage(Message msg, String queueName) {
        return new Message(null, MessageType.OFFER, ResponseStatus.NULL, false, queueName, -1L, msg.getTopics(), msg.getMimetype(), msg.getCharset(), msg.getData());
    }

    private static Message createQueuePollRequestMessage(String queueName) {
        return new Message(null, MessageType.POLL, ResponseStatus.NULL, false, queueName, -1L, Topics.of("queue/poll"), "application/octet-stream", null, new byte[0]);
    }

    private static byte[] toBytes(String s, String charset) {
        return s.getBytes(Charset.forName(charset));
    }
}

