/*
 * Decompiled with CFR 0.152.
 */
package com.erudika.para.server.queue;

import com.erudika.para.core.listeners.DestroyListener;
import com.erudika.para.core.queue.MockQueue;
import com.erudika.para.core.queue.Queue;
import com.erudika.para.core.utils.Para;
import com.erudika.para.server.queue.River;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import javax.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class LocalQueue
implements Queue {
    private static final Logger logger = LoggerFactory.getLogger(MockQueue.class);
    private static final int SLEEP = Para.getConfig().queuePollingWaitSec();
    private static Future<?> pollingTask;
    private static final int MAX_MESSAGES = 10;
    private static final int POLLING_INTERVAL;
    private ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue();
    private String name;

    public LocalQueue() {
        this("queue");
    }

    public LocalQueue(String name) {
        this.name = name;
    }

    public String pull() {
        String s = this.q.poll();
        return StringUtils.isBlank((CharSequence)s) ? "" : s;
    }

    public void push(String task) {
        if (!StringUtils.isBlank((CharSequence)task)) {
            this.q.add(task);
        }
    }

    public String getName() {
        return this.name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void startPolling() {
        LocalQueue.startPollingForMessages(this);
    }

    public void stopPolling() {
        LocalQueue.stopPollingForMessages();
    }

    static void startPollingForMessages(final Queue queue) {
        if (pollingTask == null) {
            logger.info("Starting local river (polling interval: {}s)", (Object)POLLING_INTERVAL);
            pollingTask = Para.getExecutorService().submit(new River(){

                @Override
                List<String> pullMessages() {
                    String msg;
                    ArrayList<String> msgs = new ArrayList<String>(10);
                    do {
                        if (StringUtils.isBlank((CharSequence)(msg = queue.pull()))) continue;
                        msgs.add(msg);
                    } while (!StringUtils.isBlank((CharSequence)msg) && msgs.size() <= 10);
                    return msgs;
                }
            });
            Para.addDestroyListener((DestroyListener)new DestroyListener(){

                public void onDestroy() {
                    LocalQueue.stopPollingForMessages();
                }
            });
        }
    }

    static void stopPollingForMessages() {
        if (pollingTask != null) {
            logger.info("Stopping local river...");
            pollingTask.cancel(true);
        }
    }

    static {
        POLLING_INTERVAL = Para.getConfig().queuePollingIntervalSec();
    }
}

