package com.almende.eve.transport.zmq;

import com.almende.eve.capabilities.handler.Handler;
import com.almende.eve.transport.AbstractTransport;
import com.almende.eve.transport.Receiver;
import com.almende.eve.transport.TransportService;
import com.almende.eve.transport.tokens.TokenRet;
import com.almende.eve.transport.tokens.TokenStore;
import com.almende.util.ObjectCache;
import com.almende.util.callback.AsyncCallback;
import com.almende.util.callback.AsyncCallbackQueue;
import com.almende.util.callback.SyncCallback;
import com.almende.util.jackson.JOM;
import com.almende.util.threads.ThreadPool;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.zeromq.ZMQ;

/* loaded from: input_file:com/almende/eve/transport/zmq/ZmqTransport.class */
public class ZmqTransport extends AbstractTransport {
    private final String zmqUrl;
    private Thread listeningThread;
    private boolean doesAuthentication;
    private boolean doDisconnect;
    private final TokenStore tokenstore;
    private final List<String> protocols;
    private static final Logger LOG = Logger.getLogger(ZmqTransport.class.getCanonicalName());
    private static final AsyncCallbackQueue<String> CALLBACKS = new AsyncCallbackQueue<>();

    public ZmqTransport(ZmqTransportConfig zmqTransportConfig, Handler<Receiver> handler, TransportService transportService) {
        super(zmqTransportConfig.getAddress(), handler, transportService, zmqTransportConfig);
        this.doesAuthentication = false;
        this.doDisconnect = false;
        this.tokenstore = new TokenStore();
        this.protocols = Arrays.asList("zmq");
        this.zmqUrl = super.getAddress().toString().replaceFirst("^zmq:/?/?", "");
        this.doesAuthentication = zmqTransportConfig.getDoAuthentication();
    }

    public void sendAsync(final byte[] bArr, final String str, final URI uri, final byte[] bArr2, String str2) {
        final String uri2 = super.getAddress().toString();
        ThreadPool.getPool().execute(new Runnable() { // from class: com.almende.eve.transport.zmq.ZmqTransport.1
            @Override // java.lang.Runnable
            public void run() {
                String replaceFirst = uri.toString().replaceFirst("zmq:/?/?", "");
                ZMQ.Socket socket = ZMQ.getSocket(8);
                try {
                    socket.connect(replaceFirst);
                    socket.send(bArr, 2);
                    socket.send(uri2, 2);
                    socket.send(str, 2);
                    socket.send(bArr2, 0);
                } catch (Exception e) {
                    ZmqTransport.LOG.log(Level.WARNING, "Failed to send JSON through ZMQ", (Throwable) e);
                }
                socket.setTCPKeepAlive(-1L);
                socket.setLinger(-1L);
                socket.close();
            }
        });
    }

    public void send(URI uri, String str, String str2) throws IOException {
        if (sendLocal(uri, str)) {
            return;
        }
        sendAsync(ZMQ.NORMAL, this.tokenstore.create().toString(), uri, str.getBytes(), str2);
    }

    public void send(URI uri, byte[] bArr, String str) throws IOException {
        if (sendLocal(uri, bArr)) {
            return;
        }
        sendAsync(ZMQ.NORMAL, this.tokenstore.create().toString(), uri, bArr, str);
    }

    public void connect() throws IOException {
        if (this.listeningThread != null) {
            disconnect();
        }
        listen();
    }

    public void disconnect() {
        this.doDisconnect = true;
        this.listeningThread.interrupt();
    }

    public List<String> getProtocols() {
        return this.protocols;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ByteBuffer[] getRequest(ZMQ.Socket socket) {
        byte[] recv = socket.recv();
        ByteBuffer[] byteBufferArr = new ByteBuffer[4];
        if (recv != null) {
            byteBufferArr[0] = ByteBuffer.wrap(recv);
            byteBufferArr[1] = ByteBuffer.wrap(socket.recv());
            byteBufferArr[2] = ByteBuffer.wrap(socket.recv());
            byteBufferArr[3] = ByteBuffer.wrap(socket.recv());
        }
        return byteBufferArr;
    }

    public void listen() {
        this.listeningThread = new Thread(new Runnable() { // from class: com.almende.eve.transport.zmq.ZmqTransport.2
            @Override // java.lang.Runnable
            public void run() {
                ZMQ.Socket socket = ZMQ.getSocket(7);
                socket.bind(ZmqTransport.this.zmqUrl);
                while (true) {
                    try {
                        ByteBuffer[] request = ZmqTransport.this.getRequest(socket);
                        if (request[0] != null) {
                            ZmqTransport.this.handleMsg(request);
                        } else {
                            if (ZmqTransport.this.doDisconnect) {
                                socket.disconnect(ZmqTransport.this.zmqUrl);
                                ZmqTransport.this.doDisconnect = false;
                                return;
                            }
                            continue;
                        }
                    } catch (Exception e) {
                        ZmqTransport.LOG.log(Level.SEVERE, "Caught error:", (Throwable) e);
                    }
                }
            }
        });
        this.listeningThread.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleMsg(ByteBuffer[] byteBufferArr) throws ClassNotFoundException, InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException, IOException, URISyntaxException {
        URI uri = new URI(new String(byteBufferArr[1].array()));
        TokenRet tokenRet = (TokenRet) JOM.getInstance().readValue(byteBufferArr[2].array(), TokenRet.class);
        String str = new String(byteBufferArr[3].array());
        String str2 = uri + ":" + tokenRet.getToken();
        if (Arrays.equals(byteBufferArr[0].array(), ZMQ.HANDSHAKE)) {
            String str3 = this.tokenstore.get(str);
            sendAsync(ZMQ.HANDSHAKE_RESPONSE, str3, uri, str3.getBytes(), null);
            return;
        }
        if (Arrays.equals(byteBufferArr[0].array(), ZMQ.HANDSHAKE_RESPONSE)) {
            AsyncCallback pull = CALLBACKS.pull(str2);
            if (pull != null) {
                pull.onSuccess(str);
                return;
            } else {
                LOG.warning("Received ZMQ.HANDSHAKE_RESPONSE for unknown handshake..." + uri + " : " + tokenRet);
                return;
            }
        }
        ObjectCache objectCache = ObjectCache.get("ZMQSessions");
        if (!objectCache.containsKey(str2) && this.doesAuthentication) {
            SyncCallback<String> syncCallback = new SyncCallback<String>() { // from class: com.almende.eve.transport.zmq.ZmqTransport.3
            };
            CALLBACKS.push(str2, "", syncCallback);
            sendAsync(ZMQ.HANDSHAKE, tokenRet.toString(), uri, tokenRet.getTime().getBytes(), null);
            String str4 = null;
            try {
                str4 = (String) syncCallback.get();
            } catch (Exception e) {
            }
            if (!tokenRet.getToken().equals(str4)) {
                LOG.warning("Failed to complete handshake!");
                return;
            }
            objectCache.put(str2, true);
        }
        if (str != null) {
            ((Receiver) super.getHandle().get()).receive(str, uri, (String) null);
        }
    }
}
