/*
 * Decompiled with CFR 0.152.
 */
package org.kaazing.gateway.service.cluster;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.MessageListener;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.kaazing.gateway.service.cluster.ClusterContext;
import org.kaazing.gateway.service.cluster.MemberId;
import org.kaazing.gateway.service.cluster.ReceiveListener;
import org.kaazing.gateway.service.cluster.SendListener;
import org.kaazing.gateway.util.scheduler.SchedulerProvider;

public class ClusterMessaging {
    private final String localTopicName;
    private final AtomicInteger nonce;
    private final Map<Class<?>, ReceiveListener<?>> receiveListeners;
    private final Map<Integer, SendListener> sendListeners;
    private final ScheduledExecutorService scheduler;
    private final ClusterContext clusterContext;
    private final HazelcastInstance cluster;
    private int syncTimeout = 10000;

    public ClusterMessaging(ClusterContext clusterContext, HazelcastInstance clusterInstance, SchedulerProvider schedulerProvider) {
        this.clusterContext = clusterContext;
        this.cluster = clusterInstance;
        this.localTopicName = ClusterMessaging.getLocalTopicName(clusterContext);
        this.nonce = new AtomicInteger(1);
        this.receiveListeners = new HashMap();
        this.sendListeners = new HashMap<Integer, SendListener>();
        this.scheduler = schedulerProvider.getScheduler("clusterMessaging", true);
        this.init();
    }

    private int nextId() {
        return this.nonce.incrementAndGet();
    }

    private static String getTopicName(MemberId member) {
        return member.getId() + ":com";
    }

    private <T> ITopic<T> getTopic(MemberId member) {
        String topicName = ClusterMessaging.getTopicName(member);
        ITopic topic = this.cluster.getTopic(topicName);
        return topic;
    }

    private static String getLocalTopicName(ClusterContext clusterContext) {
        MemberId localMember = clusterContext.getLocalMember();
        String topicName = ClusterMessaging.getTopicName(localMember);
        return topicName;
    }

    private void init() {
        this.addReceiveTopic(ClusterMessaging.getLocalTopicName(this.clusterContext));
    }

    public void addReceiveQueue(final String name) {
        new Thread(){

            @Override
            public void run() {
                IQueue queue = ClusterMessaging.this.clusterContext.getCollectionsFactory().getQueue(name);
                while (true) {
                    try {
                        while (true) {
                            Message msg = (Message)queue.take();
                            ClusterMessaging.this.receiveMessage(msg);
                        }
                    }
                    catch (Exception exception) {
                        continue;
                    }
                    break;
                }
            }
        }.start();
    }

    public void addReceiveTopic(String name) {
        ITopic topic = this.clusterContext.getCollectionsFactory().getTopic(name);
        topic.addMessageListener((MessageListener)new MessageListener<Message>(){

            public void onMessage(Message msg) {
                ClusterMessaging.this.receiveMessage(msg);
            }
        });
    }

    private void receiveMessage(Message msg) {
        Object payload = msg.getPayload();
        if (msg instanceof Request) {
            Request req = (Request)msg;
            String replyToName = req.getReplyTo();
            ITopic replyToTopic = this.cluster.getTopic(replyToName);
            ReceiveListener<?> receiveListener = this.receiveListeners.get(payload.getClass());
            if (receiveListener != null) {
                Object resPayload;
                try {
                    resPayload = this.receiveHelper(receiveListener, payload);
                }
                catch (Exception e) {
                    ErrorResponse res = new ErrorResponse(this.nextId(), req.getId());
                    res.setPayload(e);
                    replyToTopic.publish((Object)res);
                    return;
                }
                Response res = new Response(this.nextId(), req.getId());
                res.setPayload(resPayload);
                replyToTopic.publish((Object)res);
            }
        } else if (msg instanceof Response) {
            Response res = (Response)msg;
            SendListener sendListener = this.sendListeners.remove(res.getResponseTo());
            if (res instanceof ErrorResponse) {
                Exception exception = null;
                if (payload instanceof Exception) {
                    exception = (Exception)payload;
                }
                sendListener.onException(exception);
            } else {
                sendListener.onResponse(payload);
            }
        }
    }

    private <T> Object receiveHelper(ReceiveListener<T> receiveListener, Object msg) throws Exception {
        return receiveListener.onReceive(msg);
    }

    public void destroy() {
    }

    public Object send(Object msg, MemberId member) throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);
        final Object[] ptr = new Object[1];
        this.send(msg, new SendListener(){

            @Override
            public void onException(Exception e) {
                ptr[0] = e;
                latch.countDown();
            }

            @Override
            public void onResponse(Object msg) {
                ptr[0] = msg;
                latch.countDown();
            }
        }, member);
        latch.await();
        if (ptr[0] instanceof Exception) {
            throw new Exception((Exception)ptr[0]);
        }
        return ptr[0];
    }

    public Object send(Object msg, String name) throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);
        final Object[] ptr = new Object[1];
        this.send(msg, new SendListener(){

            @Override
            public void onException(Exception e) {
                ptr[0] = e;
                latch.countDown();
            }

            @Override
            public void onResponse(Object msg) {
                ptr[0] = msg;
                latch.countDown();
            }
        }, name);
        latch.await();
        if (ptr[0] instanceof Exception) {
            throw new Exception((Exception)ptr[0]);
        }
        return ptr[0];
    }

    public void send(Object msg, SendListener listener, MemberId member) {
        ITopic receipient = this.getTopic(member);
        Request req = this.createRequest(msg, listener);
        receipient.publish((Object)req);
    }

    public void send(Object msg, SendListener listener, String name) {
        IQueue receipient = this.clusterContext.getCollectionsFactory().getQueue(name);
        Request req = this.createRequest(msg, listener);
        receipient.add((Object)req);
    }

    private Request createRequest(Object msg, final SendListener listener) {
        final Request req = new Request(this.nextId());
        req.setPayload(msg);
        req.setReplyTo(this.localTopicName);
        final ScheduledFuture<?> timeoutFuture = this.scheduler.schedule(new Runnable(){

            @Override
            public void run() {
                SendListener sendListener = (SendListener)ClusterMessaging.this.sendListeners.remove(req.getId());
                if (sendListener != null) {
                    sendListener.onException(new Exception("Request timed out"));
                }
            }
        }, (long)this.syncTimeout, TimeUnit.MILLISECONDS);
        SendListener sendListener = new SendListener(){

            @Override
            public void onException(Exception e) {
                if (timeoutFuture.cancel(false)) {
                    listener.onException(e);
                }
            }

            @Override
            public void onResponse(Object msg) {
                if (timeoutFuture.cancel(false)) {
                    listener.onResponse(msg);
                }
            }
        };
        this.sendListeners.put(req.getId(), sendListener);
        return req;
    }

    public <T> void setReceiver(Class<T> type, ReceiveListener<T> receiveListener) {
        this.receiveListeners.put(type, receiveListener);
    }

    public <T> void removeReceiver(Class<T> type) {
        this.receiveListeners.remove(type);
    }

    public static class ErrorResponse
    extends Response {
        private static final long serialVersionUID = 1L;

        public ErrorResponse(int id, int responseTo) {
            super(id, responseTo);
        }
    }

    public static class Response
    extends Message {
        private static final long serialVersionUID = 1L;
        private int responseTo;

        public Response(int id, int responseTo) {
            super(id);
            this.responseTo = responseTo;
        }

        public int getResponseTo() {
            return this.responseTo;
        }
    }

    public static class Request
    extends Message {
        private static final long serialVersionUID = 1L;
        private String replyTo;

        public Request(int id) {
            super(id);
        }

        public void setReplyTo(String replyTo) {
            this.replyTo = replyTo;
        }

        public String getReplyTo() {
            return this.replyTo;
        }
    }

    public static class Message
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final int id;
        private Object payload;

        public Message(int id) {
            this.id = id;
        }

        public int getId() {
            return this.id;
        }

        public void setPayload(Object payload) {
            this.payload = payload;
        }

        public Object getPayload() {
            return this.payload;
        }
    }
}

