/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.scout.rt.server.clientnotification;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.scout.rt.dataobject.id.NodeId;
import org.eclipse.scout.rt.platform.Bean;
import org.eclipse.scout.rt.platform.config.CONFIG;
import org.eclipse.scout.rt.platform.util.CollectionUtility;
import org.eclipse.scout.rt.platform.util.FinalValue;
import org.eclipse.scout.rt.platform.util.date.DateUtility;
import org.eclipse.scout.rt.server.clientnotification.ClientNotificationProperties;
import org.eclipse.scout.rt.shared.clientnotification.ClientNotificationMessage;
import org.eclipse.scout.rt.shared.clientnotification.IClientNotificationAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Bean
public class ClientNotificationNodeQueue {
    private static final Logger LOG = LoggerFactory.getLogger(ClientNotificationNodeQueue.class);
    private final FinalValue<NodeId> m_nodeId = new FinalValue();
    private final int m_capacity;
    private final BlockingDeque<ClientNotificationMessage> m_notifications;
    private final AtomicLong m_lastConsumeAccess;

    public ClientNotificationNodeQueue() {
        this((Integer)CONFIG.getPropertyValue(ClientNotificationProperties.NodeQueueCapacity.class));
    }

    public ClientNotificationNodeQueue(int capacity) {
        this.m_capacity = capacity;
        this.m_notifications = new LinkedBlockingDeque<ClientNotificationMessage>(capacity);
        this.m_lastConsumeAccess = new AtomicLong(System.currentTimeMillis());
    }

    public void setNodeId(NodeId nodeId) {
        this.m_nodeId.set((Object)nodeId);
    }

    public NodeId getNodeId() {
        return (NodeId)this.m_nodeId.get();
    }

    public int getCapacity() {
        return this.m_capacity;
    }

    public void put(ClientNotificationMessage notification) {
        this.put(CollectionUtility.arrayList((Object)notification));
    }

    public void put(Collection<? extends ClientNotificationMessage> notificationInput) {
        List<ClientNotificationMessage> notifications = this.getRelevantNotifications(notificationInput);
        this.putDroppingOld(notifications);
    }

    private void putDroppingOld(Collection<? extends ClientNotificationMessage> notifications) {
        ArrayList<ClientNotificationMessage> droppedNotifications = new ArrayList<ClientNotificationMessage>();
        for (ClientNotificationMessage clientNotificationMessage : notifications) {
            boolean inserted = this.m_notifications.offer(clientNotificationMessage);
            while (!inserted) {
                ClientNotificationMessage removed = this.m_notifications.poll();
                if (removed != null) {
                    droppedNotifications.add(removed);
                }
                inserted = this.m_notifications.offer(clientNotificationMessage);
            }
        }
        if (!droppedNotifications.isEmpty()) {
            Function<Stream, String> function = s -> s.map(m -> m.getNotification().getClass().getSimpleName() + " -> " + m.getAddress().prettyPrint()).collect(Collectors.groupingBy(Function.identity(), Collectors.counting())).entrySet().stream().sorted(Map.Entry.comparingByValue().reversed()).map(e -> (String)e.getKey() + " (" + String.valueOf(e.getValue()) + "x)").collect(Collectors.joining(", ", "[", "]"));
            LOG.error("Notification queue capacity reached. Added {}, removed oldest {} notification messages. [clientNodeId={}, lastConsumeAccess={}, newNotifications={}, droppedNotifications={}]", new Object[]{notifications.size(), droppedNotifications.size(), this.getNodeId(), this.getLastConsumeAccessFormatted(), function.apply(notifications.stream()), function.apply(droppedNotifications.stream())});
            if (LOG.isDebugEnabled()) {
                Function<Stream, String> detailInfoExtractor = s -> s.map(m -> m.toString()).collect(Collectors.joining("\n    ", "\n    ", ""));
                LOG.debug("Notification queue capacity reached. Details:\n  newNotifications={}\n  droppedNotifications={}", new Object[]{detailInfoExtractor.apply(notifications.stream()), detailInfoExtractor.apply(droppedNotifications.stream()), new Exception("stacktrace for further analysis")});
            }
        }
    }

    public long getLastConsumeAccess() {
        return this.m_lastConsumeAccess.get();
    }

    public String getLastConsumeAccessFormatted() {
        return DateUtility.format((Date)new Date(this.getLastConsumeAccess()), (String)"yyyy-MM-dd HH:mm:ss.SSS");
    }

    public List<ClientNotificationMessage> consume(int maxAmount, long maxWaitTime, TimeUnit unit) {
        this.m_lastConsumeAccess.set(System.currentTimeMillis());
        List<ClientNotificationMessage> result = this.getNotifications(maxAmount, maxWaitTime, unit);
        LOG.debug("consumed {} notifications. [clientNodeId={}]", (Object)result.size(), (Object)this.getNodeId());
        return result;
    }

    protected List<ClientNotificationMessage> getNotifications(int maxAmount, long maxWaitTime, TimeUnit unit) {
        LinkedList<ClientNotificationMessage> collected = new LinkedList<ClientNotificationMessage>();
        try {
            ClientNotificationMessage next = this.m_notifications.poll(maxWaitTime, unit);
            if (next != null) {
                collected.add(next);
            }
            int timeout = 234;
            while (next != null && collected.size() < maxAmount) {
                next = this.m_notifications.poll(timeout, TimeUnit.MILLISECONDS);
                if (next == null) continue;
                collected.add(next);
            }
        }
        catch (InterruptedException e) {
            LOG.info("Interrupted while waiting for client notification messages", (Throwable)e);
        }
        return collected;
    }

    private List<ClientNotificationMessage> getRelevantNotifications(Collection<? extends ClientNotificationMessage> notificationInput) {
        return notificationInput.stream().filter(msg -> this.isRelevant(msg.getAddress())).collect(Collectors.toList());
    }

    public boolean isRelevant(IClientNotificationAddress address) {
        return address.isNotifyAllSessions() || address.isNotifyAllNodes() || CollectionUtility.hasElements((Collection)address.getSessionIds()) || CollectionUtility.hasElements((Collection)address.getUserIds());
    }
}

