/*
 * Decompiled with CFR 0.152.
 */
package org.atmosphere.cache;

import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.atmosphere.cache.BroadcastMessage;
import org.atmosphere.cache.BroadcasterCacheInspector;
import org.atmosphere.cache.CacheMessage;
import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.BroadcasterCache;
import org.atmosphere.cpr.BroadcasterCacheListener;
import org.atmosphere.util.ExecutorsFactory;
import org.atmosphere.util.UUIDProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UUIDBroadcasterCache
implements BroadcasterCache {
    private static final Logger logger = LoggerFactory.getLogger(UUIDBroadcasterCache.class);
    private final Map<String, ConcurrentLinkedQueue<CacheMessage>> messages = new ConcurrentHashMap<String, ConcurrentLinkedQueue<CacheMessage>>();
    private final Map<String, Long> activeClients = new ConcurrentHashMap<String, Long>();
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    protected final List<BroadcasterCacheInspector> inspectors = new LinkedList<BroadcasterCacheInspector>();
    private ScheduledFuture<?> scheduledFuture;
    protected ScheduledExecutorService taskScheduler;
    private long clientIdleTime = TimeUnit.SECONDS.toMillis(60L);
    private long invalidateCacheInterval = TimeUnit.SECONDS.toMillis(30L);
    private int maxPerClient = 1000;
    private long messageTTL = TimeUnit.SECONDS.toMillis(300L);
    private int maxTotal = 100000;
    private boolean shared = true;
    protected final List<Object> emptyList = List.of();
    protected final List<BroadcasterCacheListener> listeners = new LinkedList<BroadcasterCacheListener>();
    private UUIDProvider uuidProvider;
    private final AtomicLong evictions = new AtomicLong();
    private final AtomicLong cacheHits = new AtomicLong();
    private final AtomicLong cacheMisses = new AtomicLong();

    @Override
    public void configure(AtmosphereConfig config) {
        Object o = config.properties().get("shared");
        if (o != null) {
            this.shared = Boolean.parseBoolean(o.toString());
        }
        this.taskScheduler = this.shared ? ExecutorsFactory.getScheduler(config) : Executors.newSingleThreadScheduledExecutor();
        this.clientIdleTime = TimeUnit.SECONDS.toMillis(Long.parseLong(config.getInitParameter("org.atmosphere.cache.UUIDBroadcasterCache.clientIdleTime", "60")));
        this.invalidateCacheInterval = TimeUnit.SECONDS.toMillis(Long.parseLong(config.getInitParameter("org.atmosphere.cache.UUIDBroadcasterCache.invalidateCacheInterval", "30")));
        this.maxPerClient = Integer.parseInt(config.getInitParameter("org.atmosphere.cache.UUIDBroadcasterCache.maxPerClient", "1000"));
        this.messageTTL = TimeUnit.SECONDS.toMillis(Long.parseLong(config.getInitParameter("org.atmosphere.cache.UUIDBroadcasterCache.messageTTL", "300")));
        this.maxTotal = Integer.parseInt(config.getInitParameter("org.atmosphere.cache.UUIDBroadcasterCache.maxTotal", "100000"));
        this.uuidProvider = config.uuidProvider();
    }

    @Override
    public void start() {
        this.scheduledFuture = this.taskScheduler.scheduleWithFixedDelay(this::invalidateExpiredEntries, 0L, this.invalidateCacheInterval, TimeUnit.MILLISECONDS);
    }

    @Override
    public void stop() {
        this.cleanup();
        if (this.taskScheduler != null) {
            this.taskScheduler.shutdown();
        }
    }

    @Override
    public void cleanup() {
        this.messages.clear();
        this.activeClients.clear();
        this.inspectors.clear();
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
            this.scheduledFuture = null;
        }
    }

    @Override
    public CacheMessage addToCache(String broadcasterId, String uuid, BroadcastMessage message) {
        if (logger.isTraceEnabled()) {
            logger.trace("Adding for AtmosphereResource {} cached messages {}", (Object)uuid, message.message());
            logger.trace("Active clients {}", this.activeClients());
        }
        String messageId = this.uuidProvider.generateUuid();
        boolean cache = this.inspect(message);
        CacheMessage cacheMessage = new CacheMessage(messageId, message.message(), uuid);
        if (cache) {
            if (uuid.equals("null")) {
                for (Map.Entry<String, Long> entry : this.activeClients.entrySet()) {
                    this.addMessageIfNotExists(broadcasterId, entry.getKey(), cacheMessage);
                }
            } else {
                this.cacheCandidate(broadcasterId, uuid);
                this.addMessageIfNotExists(broadcasterId, uuid, cacheMessage);
            }
        }
        return cacheMessage;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Object> retrieveFromCache(String broadcasterId, String uuid) {
        try {
            this.readWriteLock.writeLock().lock();
            this.cacheCandidate(broadcasterId, uuid);
            ConcurrentLinkedQueue<CacheMessage> clientQueue = this.messages.remove(uuid);
            if (clientQueue != null) {
                this.cacheHits.incrementAndGet();
                if (logger.isTraceEnabled()) {
                    logger.trace("Retrieved for AtmosphereResource {} cached messages {}", (Object)uuid, (Object)clientQueue.size());
                    logger.trace("Available cached message {}", this.messages);
                }
                List<Object> list = clientQueue.parallelStream().map(CacheMessage::getMessage).toList();
                return list;
            }
            this.cacheMisses.incrementAndGet();
            List<Object> list = List.of();
            return list;
        }
        finally {
            this.readWriteLock.writeLock().unlock();
        }
    }

    @Override
    public BroadcasterCache clearCache(String broadcasterId, String uuid, CacheMessage message) {
        ConcurrentLinkedQueue<CacheMessage> clientQueue = this.messages.get(uuid);
        if (clientQueue != null && !clientQueue.isEmpty()) {
            logger.trace("Removing for AtmosphereResource {} cached message {}", (Object)uuid, message.getMessage());
            this.notifyRemoveCache(broadcasterId, new CacheMessage(message.getId(), message.getCreateTime(), message.getMessage(), uuid));
            clientQueue.remove(message);
        }
        return this;
    }

    @Override
    public BroadcasterCache inspector(BroadcasterCacheInspector b) {
        this.inspectors.add(b);
        return this;
    }

    @Override
    public BroadcasterCache addBroadcasterCacheListener(BroadcasterCacheListener l) {
        this.listeners.add(l);
        return this;
    }

    @Override
    public BroadcasterCache removeBroadcasterCacheListener(BroadcasterCacheListener l) {
        this.listeners.remove(l);
        return this;
    }

    protected String uuid(AtmosphereResource r) {
        return r.uuid();
    }

    private void addMessageIfNotExists(String broadcasterId, String clientId, CacheMessage message) {
        if (!this.hasMessage(clientId, message.getId())) {
            this.addMessage(broadcasterId, clientId, message);
        } else {
            logger.debug("Duplicate message {} for client {}", (Object)message, (Object)clientId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addMessage(String broadcasterId, String clientId, CacheMessage message) {
        try {
            this.readWriteLock.readLock().lock();
            ConcurrentLinkedQueue<CacheMessage> clientQueue = this.messages.get(clientId);
            if (clientQueue == null) {
                clientQueue = new ConcurrentLinkedQueue();
                if (this.activeClients.get(clientId) != null) {
                    this.messages.put(clientId, clientQueue);
                } else {
                    logger.debug("Client {} is no longer active. Not caching message {}}", (Object)clientId, (Object)message);
                    return;
                }
            }
            this.notifyAddCache(broadcasterId, message);
            clientQueue.offer(message);
            while (clientQueue.size() > this.maxPerClient) {
                CacheMessage evicted = clientQueue.poll();
                if (evicted == null) continue;
                this.evictions.incrementAndGet();
                logger.trace("Evicted oldest message for client {} (max-per-client={})", (Object)clientId, (Object)this.maxPerClient);
            }
        }
        finally {
            this.readWriteLock.readLock().unlock();
        }
    }

    private void notifyAddCache(String broadcasterId, CacheMessage message) {
        for (BroadcasterCacheListener l : this.listeners) {
            try {
                l.onAddCache(broadcasterId, message);
            }
            catch (Exception ex) {
                logger.warn("Listener exception", (Throwable)ex);
            }
        }
    }

    private void notifyRemoveCache(String broadcasterId, CacheMessage message) {
        for (BroadcasterCacheListener l : this.listeners) {
            try {
                l.onRemoveCache(broadcasterId, message);
            }
            catch (Exception ex) {
                logger.warn("Listener exception", (Throwable)ex);
            }
        }
    }

    private boolean hasMessage(String clientId, String messageId) {
        ConcurrentLinkedQueue<CacheMessage> clientQueue = this.messages.get(clientId);
        return clientQueue != null && clientQueue.parallelStream().anyMatch(m -> Objects.equals(m.getId(), messageId));
    }

    public Map<String, ConcurrentLinkedQueue<CacheMessage>> messages() {
        return this.messages;
    }

    public Map<String, Long> activeClients() {
        return this.activeClients;
    }

    protected boolean inspect(BroadcastMessage m) {
        for (BroadcasterCacheInspector b : this.inspectors) {
            if (b.inspect(m)) continue;
            return false;
        }
        return true;
    }

    public void setInvalidateCacheInterval(long invalidateCacheInterval) {
        this.invalidateCacheInterval = invalidateCacheInterval;
        this.scheduledFuture.cancel(true);
        this.start();
    }

    public void setClientIdleTime(long clientIdleTime) {
        this.clientIdleTime = clientIdleTime;
    }

    public void setMaxPerClient(int maxPerClient) {
        this.maxPerClient = maxPerClient;
    }

    public void setMessageTTL(long messageTTLMillis) {
        this.messageTTL = messageTTLMillis;
    }

    public void setMaxTotal(int maxTotal) {
        this.maxTotal = maxTotal;
    }

    protected void invalidateExpiredEntries() {
        long now = System.currentTimeMillis();
        HashSet<String> inactiveClients = new HashSet<String>();
        for (Map.Entry<String, Long> entry : this.activeClients.entrySet()) {
            if (now - entry.getValue() <= this.clientIdleTime) continue;
            logger.trace("Invalidate client {}", (Object)entry.getKey());
            inactiveClients.add(entry.getKey());
        }
        for (String clientId : inactiveClients) {
            this.activeClients.remove(clientId);
            this.messages.remove(clientId);
        }
        for (String msg : this.messages().keySet()) {
            if (this.activeClients().containsKey(msg)) continue;
            this.messages().remove(msg);
        }
        long nowNanos = System.nanoTime();
        long ttlNanos = TimeUnit.MILLISECONDS.toNanos(this.messageTTL);
        for (ConcurrentLinkedQueue<CacheMessage> queue : this.messages.values()) {
            queue.removeIf(m -> {
                if (nowNanos - m.getCreateTime() > ttlNanos) {
                    this.evictions.incrementAndGet();
                    return true;
                }
                return false;
            });
        }
        int total = this.messages.values().stream().mapToInt(ConcurrentLinkedQueue::size).sum();
        while (total > this.maxTotal) {
            String oldestClient = null;
            long oldestTime = Long.MAX_VALUE;
            for (Map.Entry<String, ConcurrentLinkedQueue<CacheMessage>> entry : this.messages.entrySet()) {
                CacheMessage head = entry.getValue().peek();
                if (head == null || head.getCreateTime() >= oldestTime) continue;
                oldestTime = head.getCreateTime();
                oldestClient = entry.getKey();
            }
            if (oldestClient == null) break;
            ConcurrentLinkedQueue<CacheMessage> queue = this.messages.get(oldestClient);
            if (queue == null) continue;
            queue.poll();
            this.evictions.incrementAndGet();
            --total;
            if (!queue.isEmpty()) continue;
            this.messages.remove(oldestClient);
        }
    }

    @Override
    public BroadcasterCache excludeFromCache(String broadcasterId, AtmosphereResource r) {
        this.activeClients.remove(r.uuid());
        return this;
    }

    @Override
    public BroadcasterCache cacheCandidate(String broadcasterId, String uuid) {
        this.activeClients.put(uuid, System.currentTimeMillis());
        return this;
    }

    public String toString() {
        return this.getClass().getName();
    }

    public List<BroadcasterCacheListener> listeners() {
        return this.listeners;
    }

    public List<BroadcasterCacheInspector> inspectors() {
        return this.inspectors;
    }

    public int totalSize() {
        return this.messages.values().stream().mapToInt(ConcurrentLinkedQueue::size).sum();
    }

    public long evictionCount() {
        return this.evictions.get();
    }

    public long hitCount() {
        return this.cacheHits.get();
    }

    public long missCount() {
        return this.cacheMisses.get();
    }
}

