/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.map.impl.querycache.publisher;

import com.hazelcast.cluster.Member;
import com.hazelcast.cluster.MembershipAdapter;
import com.hazelcast.cluster.MembershipEvent;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.internal.util.CollectionUtil;
import com.hazelcast.map.impl.querycache.QueryCacheContext;
import com.hazelcast.map.impl.querycache.QueryCacheScheduler;
import com.hazelcast.map.impl.querycache.accumulator.AccumulatorInfo;
import com.hazelcast.map.impl.querycache.accumulator.AccumulatorInfoSupplier;
import com.hazelcast.map.impl.querycache.accumulator.AccumulatorScannerTask;
import com.hazelcast.map.impl.querycache.accumulator.DefaultAccumulatorInfoSupplier;
import com.hazelcast.map.impl.querycache.publisher.AccumulatorSweeper;
import com.hazelcast.map.impl.querycache.publisher.MapListenerRegistry;
import com.hazelcast.map.impl.querycache.publisher.MapPublisherRegistry;
import com.hazelcast.map.impl.querycache.publisher.PartitionAccumulatorRegistry;
import com.hazelcast.map.impl.querycache.publisher.PublisherContext;
import com.hazelcast.map.impl.querycache.publisher.PublisherRegistry;
import com.hazelcast.spi.impl.NodeEngine;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

public class DefaultPublisherContext
implements PublisherContext {
    private static final long SCAN_PERIOD_SECONDS = 5L;
    private static final long ORPHANED_QUERY_CACHE_REMOVAL_DELAY_SECONDS = TimeUnit.MINUTES.toSeconds(10L);
    private final QueryCacheContext context;
    private final NodeEngine nodeEngine;
    private final MapListenerRegistry mapListenerRegistry;
    private final MapPublisherRegistry mapPublisherRegistry;
    private final AccumulatorInfoSupplier accumulatorInfoSupplier;
    private final Function<String, UUID> listenerRegistrator;
    private final ConcurrentMap<UUID, ScheduledFuture> removalCandidateFutures;

    public DefaultPublisherContext(QueryCacheContext context, NodeEngine nodeEngine, Function<String, UUID> listenerRegistrator) {
        this.context = context;
        this.nodeEngine = nodeEngine;
        this.mapListenerRegistry = new MapListenerRegistry(context);
        this.mapPublisherRegistry = new MapPublisherRegistry(context);
        this.accumulatorInfoSupplier = new DefaultAccumulatorInfoSupplier();
        this.listenerRegistrator = listenerRegistrator;
        this.removalCandidateFutures = new ConcurrentHashMap<UUID, ScheduledFuture>();
        this.startBackgroundAccumulatorScanner();
        this.handleSubscriberAddRemove();
    }

    @Override
    public AccumulatorInfoSupplier getAccumulatorInfoSupplier() {
        return this.accumulatorInfoSupplier;
    }

    @Override
    public MapPublisherRegistry getMapPublisherRegistry() {
        return this.mapPublisherRegistry;
    }

    @Override
    public MapListenerRegistry getMapListenerRegistry() {
        return this.mapListenerRegistry;
    }

    @Override
    public QueryCacheContext getContext() {
        return this.context;
    }

    @Override
    public NodeEngine getNodeEngine() {
        return this.nodeEngine;
    }

    @Override
    public Function<String, UUID> getListenerRegistrator() {
        return this.listenerRegistrator;
    }

    @Override
    public void handleDisconnectedSubscriber(UUID uuid) {
        Collection<PartitionAccumulatorRegistry> removalCandidates = this.getRemovalCandidates(uuid);
        if (CollectionUtil.isEmpty(removalCandidates)) {
            return;
        }
        this.startRemovalTask(removalCandidates, uuid);
    }

    @Override
    public void handleConnectedSubscriber(UUID uuid) {
        this.cancelRemovalTask(uuid);
    }

    @Override
    public void flush() {
        AccumulatorSweeper.flushAllAccumulators(this);
    }

    private Collection<PartitionAccumulatorRegistry> getRemovalCandidates(UUID uuid) {
        ArrayList<PartitionAccumulatorRegistry> candidates = new ArrayList<PartitionAccumulatorRegistry>();
        MapPublisherRegistry mapPublisherRegistry = this.getMapPublisherRegistry();
        Map<String, PublisherRegistry> all = mapPublisherRegistry.getAll();
        for (PublisherRegistry publisherRegistry : all.values()) {
            Map<String, PartitionAccumulatorRegistry> partitionAccumulators = publisherRegistry.getAll();
            Set<Map.Entry<String, PartitionAccumulatorRegistry>> entries = partitionAccumulators.entrySet();
            for (Map.Entry<String, PartitionAccumulatorRegistry> entry : entries) {
                PartitionAccumulatorRegistry accumulatorRegistry = entry.getValue();
                if (!uuid.equals(accumulatorRegistry.getUuid())) continue;
                candidates.add(accumulatorRegistry);
            }
        }
        return candidates;
    }

    private PartitionAccumulatorRegistry removePartitionAccumulatorRegistry(PartitionAccumulatorRegistry registry) {
        AccumulatorInfo info = registry.getInfo();
        String mapName = info.getMapName();
        String cacheId = info.getCacheId();
        MapPublisherRegistry mapPublisherRegistry = this.getMapPublisherRegistry();
        PublisherRegistry publisherRegistry = mapPublisherRegistry.getOrNull(mapName);
        if (publisherRegistry == null) {
            return null;
        }
        return publisherRegistry.remove(cacheId);
    }

    private void startRemovalTask(Collection<PartitionAccumulatorRegistry> removalCandidates, UUID uuid) {
        QueryCacheScheduler queryCacheScheduler = this.context.getQueryCacheScheduler();
        ScheduledFuture<?> scheduledFuture = queryCacheScheduler.scheduleWithRepetition(() -> {
            for (PartitionAccumulatorRegistry registry : removalCandidates) {
                this.removePartitionAccumulatorRegistry(registry);
            }
        }, ORPHANED_QUERY_CACHE_REMOVAL_DELAY_SECONDS);
        ScheduledFuture<?> prevFuture = this.removalCandidateFutures.put(uuid, scheduledFuture);
        if (prevFuture != null) {
            prevFuture.cancel(false);
        }
    }

    private void cancelRemovalTask(UUID uuid) {
        this.removalCandidateFutures.remove(uuid);
    }

    private void startBackgroundAccumulatorScanner() {
        QueryCacheScheduler scheduler = this.context.getQueryCacheScheduler();
        scheduler.scheduleWithRepetition(new AccumulatorScannerTask(this.context), 5L);
    }

    private void handleSubscriberAddRemove() {
        ClusterService clusterService = this.nodeEngine.getClusterService();
        clusterService.addMembershipListener(new MembershipAdapter(){

            @Override
            public void memberRemoved(MembershipEvent membershipEvent) {
                Member member = membershipEvent.getMember();
                UUID uuid = member.getUuid();
                DefaultPublisherContext.this.handleDisconnectedSubscriber(uuid);
            }

            @Override
            public void memberAdded(MembershipEvent membershipEvent) {
                Member member = membershipEvent.getMember();
                UUID uuid = member.getUuid();
                DefaultPublisherContext.this.handleConnectedSubscriber(uuid);
            }
        });
    }
}

