/*
 * Decompiled with CFR 0.152.
 */
package de.caluga.morphium.messaging;

import de.caluga.morphium.Morphium;
import de.caluga.morphium.ShutdownListener;
import de.caluga.morphium.async.AsyncOperationCallback;
import de.caluga.morphium.async.AsyncOperationType;
import de.caluga.morphium.driver.MorphiumId;
import de.caluga.morphium.messaging.MessageListener;
import de.caluga.morphium.messaging.MessageRejectedException;
import de.caluga.morphium.messaging.Msg;
import de.caluga.morphium.query.MorphiumIterator;
import de.caluga.morphium.query.Query;
import de.caluga.morphium.replicaset.OplogListener;
import de.caluga.morphium.replicaset.OplogMonitor;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Messaging
extends Thread
implements ShutdownListener {
    private static Logger log = LoggerFactory.getLogger(Messaging.class);
    private Morphium morphium;
    private boolean running;
    private int pause = 5000;
    private String id;
    private boolean autoAnswer = false;
    private String hostname;
    private boolean processMultiple = false;
    private List<MessageListener> listeners;
    private Map<String, List<MessageListener>> listenerByName;
    private String queueName;
    private ThreadPoolExecutor threadPool;
    private boolean multithreadded = false;
    private int windowSize = 1000;
    private boolean useOplogMonitor = false;
    private OplogMonitor oplogMonitor;

    public Messaging(Morphium m, int pause, boolean processMultiple) {
        this(m, null, pause, processMultiple);
    }

    public Messaging(Morphium m, int pause, boolean processMultiple, boolean multithreadded, int windowSize) {
        this(m, null, pause, processMultiple, multithreadded, windowSize);
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple) {
        this(m, queueName, pause, processMultiple, false, 1000);
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple, boolean multithreadded, int windowSize) {
        this(m, queueName, pause, processMultiple, multithreadded, windowSize, m.isReplicaSet());
    }

    public Messaging(Morphium m, String queueName, int pause, boolean processMultiple, boolean multithreadded, int windowSize, boolean useOplogMonitor) {
        this.multithreadded = multithreadded;
        this.windowSize = windowSize;
        this.morphium = m;
        this.useOplogMonitor = useOplogMonitor;
        if (multithreadded) {
            this.threadPool = new ThreadPoolExecutor(this.morphium.getConfig().getThreadPoolMessagingCoreSize(), this.morphium.getConfig().getThreadPoolMessagingMaxSize(), this.morphium.getConfig().getThreadPoolMessagingKeepAliveTime(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
            this.threadPool.setThreadFactory(new ThreadFactory(){
                private AtomicInteger num = new AtomicInteger(1);

                @Override
                public Thread newThread(Runnable r) {
                    Thread ret = new Thread(r, "messaging " + this.num);
                    this.num.set(this.num.get() + 1);
                    ret.setDaemon(true);
                    return ret;
                }
            });
        }
        this.morphium.addShutdownListener(this);
        this.queueName = queueName;
        this.running = true;
        this.pause = pause;
        this.processMultiple = processMultiple;
        this.id = UUID.randomUUID().toString();
        this.hostname = System.getenv("HOSTNAME");
        if (this.hostname == null) {
            try {
                this.hostname = InetAddress.getLocalHost().getHostName();
            }
            catch (UnknownHostException unknownHostException) {
                // empty catch block
            }
        }
        if (this.hostname == null) {
            this.hostname = "unknown host";
        }
        m.ensureIndicesFor(Msg.class, queueName);
        this.listeners = new CopyOnWriteArrayList<MessageListener>();
        this.listenerByName = new HashMap<String, List<MessageListener>>();
    }

    public long getMessageCount() {
        return this.morphium.createQueryFor(Msg.class).setCollectionName(this.getCollectionName()).countAll();
    }

    public void removeMessage(Msg m) {
        this.morphium.delete(m, this.getCollectionName());
    }

    public List<Msg> findMessages(Query<Msg> q) {
        try {
            q = q.clone();
        }
        catch (CloneNotSupportedException cloneNotSupportedException) {
            // empty catch block
        }
        q.setCollectionName(this.getCollectionName());
        return q.asList();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        if (log.isDebugEnabled()) {
            log.debug("Messaging " + this.id + " started");
        }
        if (this.useOplogMonitor) {
            log.info("init Messaging  using oplogmonitor");
            this.oplogMonitor = new OplogMonitor(this.morphium, this.getCollectionName(), false);
            this.oplogMonitor.addListener(new OplogListener(){

                @Override
                public void incomingData(Map<String, Object> data) {
                    Msg obj;
                    if (data.get("op").equals("i")) {
                        Msg obj2 = Messaging.this.morphium.getMapper().unmarshall(Msg.class, (Map)data.get("o"));
                        if (obj2.getSender().equals(Messaging.this.id) || obj2.getProcessedBy() != null && obj2.getProcessedBy().contains(Messaging.this.id) || obj2.getRecipient() != null && !obj2.getRecipient().equals(Messaging.this.id)) {
                            return;
                        }
                        if (!(!obj2.isExclusive() || obj2.getLockedBy() != null || obj2.getRecipient() != null && !obj2.getRecipient().equals(Messaging.this.id) || obj2.getProcessedBy() != null && obj2.getProcessedBy().contains(Messaging.this.id))) {
                            log.info("trying to lock exclusive message");
                            Messaging.this.lockAndProcess(obj2);
                        } else if (!obj2.isExclusive() || obj2.getRecipient() != null && obj2.getRecipient().equals(Messaging.this.id)) {
                            ArrayList<Msg> lst = new ArrayList<Msg>();
                            lst.add(obj2);
                            try {
                                Messaging.this.processMessages(lst);
                            }
                            catch (Exception e) {
                                log.error("Error during message processing ", (Throwable)e);
                            }
                        } else {
                            log.debug("Message is not for me");
                        }
                    } else if (data.get("op").equals("u") && (obj = Messaging.this.morphium.findById(Msg.class, new MorphiumId(((Map)data.get("o2")).get("_id").toString()))) != null && obj.isExclusive() && obj.getLockedBy() == null && (obj.getRecipient() == null || obj.getRecipient().equals(Messaging.this.id))) {
                        log.debug("Update of msg - trying to lock");
                        Messaging.this.lockAndProcess(obj);
                    }
                }
            });
            this.oplogMonitor.start();
        } else {
            HashMap<String, Object> values = new HashMap<String, Object>();
            while (this.running) {
                values.clear();
                try {
                    Query<Msg> q = this.morphium.createQueryFor(Msg.class);
                    q.setCollectionName(this.getCollectionName());
                    q.or(q.q().f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.lockedBy).eq(null).f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.recipient).eq(null), q.q().f((Enum)Msg.Fields.sender).ne(this.id).f((Enum)Msg.Fields.lockedBy).eq(null).f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.recipient).eq(this.id));
                    values.put("locked_by", this.id);
                    values.put("locked", System.currentTimeMillis());
                    this.morphium.set(q, values, false, this.processMultiple);
                    q = q.q();
                    q.or(q.q().f((Enum)Msg.Fields.lockedBy).eq(this.id), q.q().f((Enum)Msg.Fields.lockedBy).eq("ALL").f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.recipient).eq(this.id), q.q().f((Enum)Msg.Fields.lockedBy).eq("ALL").f((Enum)Msg.Fields.processedBy).ne(this.id).f((Enum)Msg.Fields.recipient).eq(null));
                    q.sort(Msg.Fields.timestamp);
                    MorphiumIterator<Msg> messages = q.asIterable(this.windowSize);
                    messages.setMultithreaddedAccess(this.multithreadded);
                    this.processMessages(messages);
                }
                catch (Throwable e) {
                    log.error("Unhandled exception " + e.getMessage(), e);
                }
                finally {
                    try {
                        Messaging.sleep(this.pause);
                    }
                    catch (InterruptedException q) {}
                }
            }
            if (log.isDebugEnabled()) {
                log.debug("Messaging " + this.id + " stopped!");
            }
            if (!this.running) {
                this.listeners.clear();
                this.listenerByName.clear();
            }
        }
    }

    private void lockAndProcess(Msg obj) {
        Query<Msg> q = this.morphium.createQueryFor(Msg.class);
        q.f("_id").eq(obj.getMsgId());
        HashMap<String, Object> values = new HashMap<String, Object>();
        values.put("locked_by", this.id);
        values.put("locked", System.currentTimeMillis());
        this.morphium.set(q, values, false, false);
        try {
            Thread.sleep(10L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        obj = this.morphium.reread(obj);
        if (obj != null && obj.getLockedBy() != null && obj.getLockedBy().equals(this.id)) {
            ArrayList<Msg> lst = new ArrayList<Msg>();
            lst.add(obj);
            try {
                this.processMessages(lst);
            }
            catch (Exception e) {
                log.error("Error during message processing ", (Throwable)e);
            }
        }
    }

    private void processMessages(Iterable<Msg> messages) throws InterruptedException {
        ArrayList toStore = new ArrayList();
        ArrayList toExec = new ArrayList();
        ArrayList toRemove = new ArrayList();
        for (Msg m : messages) {
            if (m == null) continue;
            Runnable r = () -> {
                if (m.getProcessedBy() != null && m.getProcessedBy().contains(this.id)) {
                    log.error("Was already processed - ERROR?");
                    throw new RuntimeException("was already processed - error on mongo query result!");
                }
                Msg msg = this.morphium.reread(m, this.getCollectionName());
                if (msg == null) {
                    return;
                }
                if (msg.getProcessedBy() != null && msg.getProcessedBy().contains(this.id)) {
                    log.info("Was already processed - multithreadding?");
                    return;
                }
                if (!msg.getLockedBy().equals(this.id) && !msg.getLockedBy().equals("ALL")) {
                    return;
                }
                if (msg.getTtl() < System.currentTimeMillis() - msg.getTimestamp()) {
                    log.info("Found outdated message - deleting it!");
                    this.morphium.delete(msg, this.getCollectionName());
                    return;
                }
                try {
                    Msg answer;
                    for (MessageListener l : this.listeners) {
                        answer = l.onMessage(this, msg);
                        if (this.autoAnswer && answer == null) {
                            answer = new Msg(msg.getName(), "received", "");
                        }
                        if (answer == null) continue;
                        msg.sendAnswer(this, answer);
                    }
                    if (this.listenerByName.get(msg.getName()) != null) {
                        for (MessageListener l : this.listenerByName.get(msg.getName())) {
                            answer = l.onMessage(this, msg);
                            if (this.autoAnswer && answer == null) {
                                answer = new Msg(msg.getName(), "received", "");
                            }
                            if (answer == null) continue;
                            msg.setDeleteAt(new Date(System.currentTimeMillis() + msg.getTtl()));
                            msg.sendAnswer(this, answer);
                        }
                    }
                }
                catch (MessageRejectedException mre) {
                    log.error("Message rejected by listener: " + mre.getMessage());
                    if (mre.isSendAnswer()) {
                        Msg answer = new Msg(msg.getName(), "message rejected by listener", mre.getMessage());
                        msg.setDeleteAt(new Date(System.currentTimeMillis() + msg.getTtl()));
                        msg.sendAnswer(this, answer);
                    }
                    if (mre.isContinueProcessing()) {
                        this.updateProcessedByAndReleaseLock(msg);
                        return;
                    }
                }
                catch (Throwable t) {
                    log.error("Processing failed", t);
                }
                if (msg.getLockedBy().equals("ALL")) {
                    toExec.add(() -> this.updateProcessedByAndReleaseLock(msg));
                } else {
                    toRemove.add(msg);
                }
            };
            this.queueOrRun(r);
        }
        if (this.multithreadded) {
            while (this.threadPool != null && this.threadPool.getActiveCount() > 0) {
                Thread.yield();
            }
        }
        this.morphium.storeList(toStore, this.getCollectionName());
        this.morphium.delete(toRemove, this.getCollectionName());
        toExec.forEach(this::queueOrRun);
        while (this.morphium.getWriteBufferCount() > 0) {
            Thread.sleep(100L);
        }
    }

    private void updateProcessedByAndReleaseLock(Msg msg) {
        Query<Msg> idq = this.morphium.createQueryFor(Msg.class);
        idq.setCollectionName(this.getCollectionName());
        idq.f((Enum)Msg.Fields.msgId).eq(msg.getMsgId());
        if (msg.getLockedBy().equals(this.id)) {
            this.morphium.set(idq, (Enum)Msg.Fields.lockedBy, null);
        }
        this.morphium.push(idq, Msg.Fields.processedBy, (Object)this.id);
    }

    private void queueOrRun(Runnable r) {
        if (this.multithreadded) {
            boolean queued = false;
            while (!queued) {
                try {
                    this.threadPool.execute(r);
                    queued = true;
                }
                catch (Throwable throwable) {}
            }
            while (this.threadPool.getActiveCount() > this.windowSize) {
                Thread.yield();
            }
        } else {
            r.run();
        }
    }

    public String getCollectionName() {
        if (this.queueName == null || this.queueName.isEmpty()) {
            return "msg";
        }
        return "mmsg_" + this.queueName;
    }

    public void addListenerForMessageNamed(String n, MessageListener l) {
        if (this.listenerByName.get(n) == null) {
            HashMap c = (HashMap)((HashMap)this.listenerByName).clone();
            c.put(n, new ArrayList());
            this.listenerByName = c;
        }
        this.listenerByName.get(n).add(l);
    }

    public void removeListenerForMessageNamed(String n, MessageListener l) {
        if (this.listenerByName.get(n) == null) {
            return;
        }
        HashMap c = (HashMap)((HashMap)this.listenerByName).clone();
        ((List)c.get(n)).remove(l);
        this.listenerByName = c;
    }

    public String getSenderId() {
        return this.id;
    }

    public void setSenderId(String id) {
        this.id = id;
    }

    public int getPause() {
        return this.pause;
    }

    public void setPause(int pause) {
        this.pause = pause;
    }

    public boolean isRunning() {
        if (this.useOplogMonitor) {
            return this.oplogMonitor != null && this.oplogMonitor.isRunning();
        }
        return this.running;
    }

    @Deprecated
    public void setRunning(boolean running) {
        if (!running && this.oplogMonitor != null) {
            this.oplogMonitor.stop();
        }
        this.running = running;
    }

    public void terminate() {
        this.running = false;
        if (this.oplogMonitor != null) {
            this.oplogMonitor.stop();
        }
    }

    public void addMessageListener(MessageListener l) {
        this.listeners.add(l);
    }

    public void removeMessageListener(MessageListener l) {
        this.listeners.remove(l);
    }

    public void queueMessage(Msg m) {
        this.storeMsg(m, true);
    }

    public void storeMessage(Msg m) {
        this.storeMsg(m, false);
    }

    public long getNumberOfMessages() {
        Query<Msg> q = this.morphium.createQueryFor(Msg.class);
        q.setCollectionName(this.getCollectionName());
        return q.countAll();
    }

    private void storeMsg(Msg m, boolean async) {
        AsyncOperationCallback cb = null;
        if (async) {
            cb = new AsyncOperationCallback(){

                public void onOperationSucceeded(AsyncOperationType type, Query q, long duration, List result, Object entity, Object ... param) {
                }

                public void onOperationError(AsyncOperationType type, Query q, long duration, String error, Throwable t, Object entity, Object ... param) {
                }
            };
        }
        m.setDeleteAt(new Date(System.currentTimeMillis() + m.getTtl()));
        m.setSender(this.id);
        m.addProcessedId(this.id);
        m.setSenderHost(this.hostname);
        if (m.getTo() != null && !m.getTo().isEmpty()) {
            for (String recipient : m.getTo()) {
                Msg msg = m.getCopy();
                msg.setRecipient(recipient);
                this.morphium.storeNoCache(msg, this.getCollectionName(), cb);
            }
        } else {
            this.morphium.storeNoCache(m, this.getCollectionName(), cb);
        }
    }

    public boolean isAutoAnswer() {
        return this.autoAnswer;
    }

    public void setAutoAnswer(boolean autoAnswer) {
        this.autoAnswer = autoAnswer;
    }

    @Override
    public void onShutdown(Morphium m) {
        try {
            if (this.threadPool != null) {
                this.threadPool.shutdownNow();
                this.threadPool = null;
            }
            if (this.oplogMonitor != null) {
                this.oplogMonitor.stop();
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }
}

