/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.resp.commands.tx;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import org.infinispan.AdvancedCache;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.metadata.Metadata;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilter;
import org.infinispan.notifications.cachelistener.filter.EventType;
import org.infinispan.protostream.annotations.ProtoTypeId;
import org.infinispan.server.resp.AclCategory;
import org.infinispan.server.resp.Resp3Handler;
import org.infinispan.server.resp.RespCommand;
import org.infinispan.server.resp.RespRequestHandler;
import org.infinispan.server.resp.commands.Resp3Command;
import org.infinispan.server.resp.commands.TransactionResp3Command;
import org.infinispan.server.resp.filter.EventListenerKeysFilter;
import org.infinispan.server.resp.meta.ClientMetadata;
import org.infinispan.server.resp.serialization.ResponseWriter;
import org.infinispan.server.resp.tx.RespTransactionHandler;

public class WATCH
extends RespCommand
implements Resp3Command,
TransactionResp3Command {
    static final AttributeKey<List<TxKeysListener>> WATCHER_KEY = AttributeKey.newInstance((String)"watchers");

    public WATCH() {
        super(-2, 1, -1, 1, AclCategory.FAST.mask() | AclCategory.TRANSACTION.mask());
    }

    @Override
    public CompletionStage<RespRequestHandler> perform(Resp3Handler handler, ChannelHandlerContext ctx, List<byte[]> arguments) {
        AdvancedCache<byte[], byte[]> cache = handler.cache();
        TxKeysListener listener = new TxKeysListener(arguments.size());
        EventListenerKeysFilter filter = new EventListenerKeysFilter(arguments.stream());
        CompletionStage<Void> cs = cache.addListenerAsync((Object)listener, (CacheEventFilter)filter, (CacheEventConverter)new TxEventConverterEmpty()).thenAccept(ignore -> this.register(ctx, listener)).thenAccept(ignore -> {
            ClientMetadata metadata = handler.respServer().metadataRepository().client();
            metadata.incrementWatchingClients();
            metadata.recordWatchedKeys(arguments.size());
        });
        return handler.stageToReturn(cs, ctx, ResponseWriter.OK);
    }

    @Override
    public CompletionStage<RespRequestHandler> perform(RespTransactionHandler handler, ChannelHandlerContext ctx, List<byte[]> arguments) {
        handler.writer().customError("WATCH inside MULTI is not allowed");
        return handler.myStage();
    }

    public void register(ChannelHandlerContext ctx, TxKeysListener listener) {
        ArrayList<TxKeysListener> watchers = (ArrayList<TxKeysListener>)ctx.channel().attr(WATCHER_KEY).get();
        if (watchers == null) {
            watchers = new ArrayList<TxKeysListener>();
            ctx.channel().attr(WATCHER_KEY).set(watchers);
        }
        watchers.add(listener);
    }

    @Listener(clustered=true)
    public static class TxKeysListener {
        private final AtomicBoolean hasEvent = new AtomicBoolean(false);
        private final int numberOfKeys;

        public TxKeysListener(int numberOfKeys) {
            this.numberOfKeys = numberOfKeys;
        }

        @CacheEntryCreated
        @CacheEntryModified
        @CacheEntryExpired
        @CacheEntryRemoved
        public CompletionStage<Void> onEvent(CacheEntryEvent<Object, Object> ignore) {
            this.hasEvent.set(true);
            return CompletableFutures.completedNull();
        }

        public boolean hasSeenEvents() {
            return this.hasEvent.get();
        }

        public int getNumberOfKeys() {
            return this.numberOfKeys;
        }
    }

    @ProtoTypeId(value=6110)
    public static class TxEventConverterEmpty
    implements CacheEventConverter<Object, Object, Object> {
        public Object convert(Object key, Object oldValue, Metadata oldMetadata, Object newValue, Metadata newMetadata, EventType eventType) {
            return null;
        }
    }
}

