/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.hotrod;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import java.io.StreamCorruptedException;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import javax.security.auth.Subject;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.CacheSet;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.util.BloomFilter;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.MurmurHash3BloomFilter;
import org.infinispan.commons.util.Util;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.container.versioning.NumericVersion;
import org.infinispan.context.Flag;
import org.infinispan.metadata.Metadata;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.security.actions.SecurityActions;
import org.infinispan.server.core.iteration.IterableIterationResult;
import org.infinispan.server.core.iteration.IterationState;
import org.infinispan.server.core.transport.ConnectionMetadata;
import org.infinispan.server.hotrod.BaseRequestProcessor;
import org.infinispan.server.hotrod.ClientListenerRegistry;
import org.infinispan.server.hotrod.HotRodHeader;
import org.infinispan.server.hotrod.HotRodOperation;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.server.hotrod.HotRodVersion;
import org.infinispan.server.hotrod.OperationStatus;
import org.infinispan.server.hotrod.ProtocolFlag;
import org.infinispan.server.hotrod.logging.Log;
import org.infinispan.server.hotrod.streaming.GetStreamResponse;
import org.infinispan.server.hotrod.streaming.StreamingState;
import org.infinispan.stats.ClusterCacheStats;
import org.infinispan.stats.Stats;
import org.infinispan.telemetry.InfinispanSpan;
import org.infinispan.telemetry.InfinispanSpanAttributes;
import org.infinispan.telemetry.InfinispanSpanContext;
import org.infinispan.telemetry.InfinispanTelemetry;
import org.infinispan.telemetry.SafeAutoClosable;

class CacheRequestProcessor
extends BaseRequestProcessor {
    private static final Log log = (Log)LogFactory.getLog(CacheRequestProcessor.class, Log.class);
    private final ClientListenerRegistry listenerRegistry;
    private final InfinispanTelemetry telemetryService;
    private final ConcurrentMap<String, BloomFilter<byte[]>> bloomFilters = new ConcurrentHashMap<String, BloomFilter<byte[]>>();

    CacheRequestProcessor(Channel channel, Executor executor, HotRodServer server, InfinispanTelemetry telemetryService) {
        super(channel, executor, server);
        this.listenerRegistry = server.getClientListenerRegistry();
        this.telemetryService = telemetryService;
    }

    void ping(HotRodHeader header, Subject subject) {
        if (!header.cacheName.isEmpty() || this.server.hasDefaultCache()) {
            this.server.ensureCacheInitialized(header).whenComplete((__, t) -> {
                if (t != null) {
                    this.writeException(header, (Throwable)t);
                } else {
                    this.pingResults(header);
                }
            });
        } else {
            this.pingResults(header);
        }
    }

    void pingResults(HotRodHeader header) {
        ConnectionMetadata metadata = ConnectionMetadata.getInstance((Channel)this.channel);
        metadata.protocolVersion(HotRodVersion.forVersion(header.version).toString());
        this.writeResponse(header, header.encoder().pingResponse(header, this.server, this.channel, OperationStatus.Success));
    }

    void stats(HotRodHeader header, Subject subject) {
        AdvancedCache<byte[], byte[]> cache = this.server.cache(this.server.getCacheInfo(header), header, subject);
        this.executor.execute(() -> this.blockingStats(header, cache));
    }

    private void blockingStats(HotRodHeader header, AdvancedCache<byte[], byte[]> cache) {
        try {
            Stats stats = cache.getStats();
            ClusterCacheStats clusterCacheStats = (ClusterCacheStats)SecurityActions.getCacheComponentRegistry(cache).getComponent(ClusterCacheStats.class);
            ByteBuf buf = header.encoder().statsResponse(header, this.server, this.channel, stats, this.server.getTransport(), clusterCacheStats);
            this.writeResponse(header, buf);
        }
        catch (Throwable t) {
            this.writeException(header, t);
        }
    }

    void get(HotRodHeader header, Subject subject, byte[] key) {
        HotRodServer.ExtendedCacheInfo cacheInfo = this.server.getCacheInfo(header);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, header, subject);
        InfinispanSpan span = this.requestStart(header, cacheInfo.getInfinispanSpanAttributes());
        try (SafeAutoClosable ignored = span.makeCurrent();){
            this.getInternal(header, cache, key, span);
        }
    }

    void updateBloomFilter(HotRodHeader header, Subject subject, byte[] bloomArray) {
        try {
            BloomFilter filter = (BloomFilter)this.bloomFilters.get(header.cacheName);
            if (filter != null) {
                if (log.isTraceEnabled()) {
                    log.tracef("Updating bloom filter %s found for cache %s", filter, header.cacheName);
                }
                filter.setBits(IntSets.from((byte[])bloomArray));
                if (log.isTraceEnabled()) {
                    log.tracef("Updated bloom filter %s for cache %s", filter, header.cacheName);
                }
                this.writeSuccess(header);
            } else {
                if (log.isTraceEnabled()) {
                    log.tracef("There was no bloom filter for cache %s from client", header.cacheName);
                }
                this.writeNotExecuted(header);
            }
        }
        catch (Throwable t) {
            this.writeException(header, t);
        }
    }

    private void getInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] key, InfinispanSpan<CacheEntry<?, ?>> span) {
        CompletableFuture get = cache.getCacheEntryAsync((Object)key);
        if (get.isDone() && !get.isCompletedExceptionally()) {
            this.handleGet(header, (CacheEntry<byte[], byte[]>)((CacheEntry)get.join()), null, span);
        } else {
            get.whenComplete((result, throwable) -> this.handleGet(header, (CacheEntry<byte[], byte[]>)result, (Throwable)throwable, span));
        }
    }

    void addToFilter(String cacheName, byte[] key) {
        BloomFilter bloomFilter = (BloomFilter)this.bloomFilters.get(cacheName);
        if (bloomFilter != null) {
            if (log.isTraceEnabled()) {
                log.tracef("Added key %s to bloom filter for cache %s", Util.toStr((Object)key), cacheName);
            }
            bloomFilter.addToFilter((Object)key);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleGet(HotRodHeader header, CacheEntry<byte[], byte[]> result, Throwable throwable, InfinispanSpan<CacheEntry<?, ?>> span) {
        try {
            if (throwable != null) {
                this.writeException(header, throwable);
                span.recordException(throwable);
            } else if (result == null) {
                this.writeNotExist(header);
            } else {
                try {
                    switch (header.op) {
                        case GET: {
                            this.writeResponse(header, header.encoder().valueResponse(header, this.server, this.channel, OperationStatus.Success, (byte[])result.getValue()));
                            break;
                        }
                        case GET_WITH_VERSION: {
                            NumericVersion numericVersion = (NumericVersion)result.getMetadata().version();
                            long version = numericVersion != null ? numericVersion.getVersion() : 0L;
                            this.writeResponse(header, header.encoder().valueWithVersionResponse(header, this.server, this.channel, (byte[])result.getValue(), version));
                            break;
                        }
                        default: {
                            throw new IllegalStateException();
                        }
                    }
                }
                catch (Throwable t) {
                    this.writeException(header, t);
                    span.recordException(t);
                }
            }
        }
        finally {
            span.complete();
        }
    }

    void getWithMetadata(HotRodHeader header, Subject subject, byte[] key, int offset) {
        HotRodServer.ExtendedCacheInfo cacheInfo = this.server.getCacheInfo(header);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, header, subject);
        this.getWithMetadataInternal(header, cache, key, offset);
    }

    private void getWithMetadataInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] key, int offset) {
        this.addToFilter(header.cacheName, key);
        CompletableFuture get = cache.getCacheEntryAsync((Object)key);
        if (get.isDone() && !get.isCompletedExceptionally()) {
            this.handleGetWithMetadata(header, offset, (CacheEntry<byte[], byte[]>)((CacheEntry)get.join()), null);
        } else {
            get.whenComplete((ce, throwable) -> this.handleGetWithMetadata(header, offset, (CacheEntry<byte[], byte[]>)ce, (Throwable)throwable));
        }
    }

    private void handleGetWithMetadata(HotRodHeader header, int offset, CacheEntry<byte[], byte[]> entry, Throwable throwable) {
        if (throwable != null) {
            this.writeException(header, throwable);
            return;
        }
        if (entry == null) {
            this.writeNotExist(header);
        } else if (header.op == HotRodOperation.GET_WITH_METADATA) {
            assert (offset == 0);
            this.writeResponse(header, header.encoder().getWithMetadataResponse(header, this.server, this.channel, entry));
        } else {
            this.writeResponse(header, header.encoder().getStreamResponse(header, this.server, this.channel, offset, entry));
        }
    }

    void containsKey(HotRodHeader header, Subject subject, byte[] key) {
        HotRodServer.ExtendedCacheInfo cacheInfo = this.server.getCacheInfo(header);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, header, subject);
        this.containsKeyInternal(header, cache, key);
    }

    private void containsKeyInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] key) {
        CompletableFuture contains = cache.containsKeyAsync((Object)key);
        if (contains.isDone() && !contains.isCompletedExceptionally()) {
            this.handleContainsKey(header, (Boolean)contains.join(), null);
        } else {
            contains.whenComplete((result, throwable) -> this.handleContainsKey(header, (Boolean)result, (Throwable)throwable));
        }
    }

    private void handleContainsKey(HotRodHeader header, Boolean result, Throwable throwable) {
        if (throwable != null) {
            this.writeException(header, throwable);
        } else if (result.booleanValue()) {
            this.writeSuccess(header);
        } else {
            this.writeNotExist(header);
        }
    }

    void put(HotRodHeader header, Subject subject, byte[] key, byte[] value, Metadata.Builder metadata) {
        HotRodServer.ExtendedCacheInfo cacheInfo = this.server.getCacheInfo(header);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, header, subject);
        InfinispanSpan span = this.requestStart(header, cacheInfo.getInfinispanSpanAttributes());
        try (SafeAutoClosable ignored = span.makeCurrent();){
            metadata.version((EntryVersion)cacheInfo.versionGenerator.generateNew());
            this.putInternal(header, cache, key, value, metadata.build(), span);
        }
    }

    private void putInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] key, byte[] value, Metadata metadata, InfinispanSpan<CacheEntry<?, ?>> span) {
        CompletionStage cs = header.hasFlag(ProtocolFlag.ForceReturnPreviousValue) ? cache.putAsyncEntry((Object)key, (Object)value, metadata) : cache.withFlags(Flag.IGNORE_RETURN_VALUES).putAsync((Object)key, (Object)value, metadata).thenApply(CompletableFutures.toNullFunction());
        cs.whenComplete((ce, throwable) -> this.handlePut(header, (CacheEntry<byte[], byte[]>)ce, (Throwable)throwable, span));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handlePut(HotRodHeader header, CacheEntry<byte[], byte[]> ce, Throwable throwable, InfinispanSpan<CacheEntry<?, ?>> span) {
        try {
            if (throwable != null) {
                this.writeException(header, throwable);
                span.recordException(throwable);
            } else {
                this.writeSuccess(header, ce);
            }
        }
        finally {
            span.complete();
        }
    }

    void replaceIfUnmodified(HotRodHeader header, Subject subject, byte[] key, long version, byte[] value, Metadata.Builder metadata) {
        HotRodServer.ExtendedCacheInfo cacheInfo = this.server.getCacheInfo(header);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, header, subject);
        InfinispanSpan span = this.requestStart(header, cacheInfo.getInfinispanSpanAttributes());
        try (SafeAutoClosable ignored = span.makeCurrent();){
            metadata.version((EntryVersion)cacheInfo.versionGenerator.generateNew());
            this.replaceIfUnmodifiedInternal(header, cache, key, version, value, metadata.build(), span);
        }
    }

    private void replaceIfUnmodifiedInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] key, long version, byte[] value, Metadata metadata, InfinispanSpan<ConditionalResponse> span) {
        ((CompletableFuture)cache.withFlags(Flag.SKIP_LISTENER_NOTIFICATION).getCacheEntryAsync((Object)key).thenCompose(entry -> this.replaceIfUnmodifiedAfterGet(cache, (CacheEntry<byte[], byte[]>)entry, version, value, metadata))).whenComplete((response, throwable) -> this.handleConditionalResponse(header, (ConditionalResponse)response, (Throwable)throwable, span));
    }

    private CompletionStage<ConditionalResponse> replaceIfUnmodifiedAfterGet(AdvancedCache<byte[], byte[]> cache, CacheEntry<byte[], byte[]> entry, long version, byte[] value, Metadata metadata) {
        if (entry == null) {
            return CompletableFuture.completedFuture(new ConditionalResponse(false, null));
        }
        NumericVersion streamVersion = new NumericVersion(version);
        if (!streamVersion.equals((Object)entry.getMetadata().version())) {
            return CompletableFuture.completedFuture(new ConditionalResponse(false, entry));
        }
        return cache.replaceAsync((Object)((byte[])entry.getKey()), (Object)((byte[])entry.getValue()), (Object)value, metadata).thenApply(replaced -> new ConditionalResponse((boolean)replaced, entry));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleConditionalResponse(HotRodHeader header, ConditionalResponse response, Throwable throwable, InfinispanSpan<ConditionalResponse> span) {
        try {
            if (throwable != null) {
                this.writeException(header, throwable);
                span.recordException(throwable);
            } else if (response.result) {
                assert (response.entry != null);
                this.writeSuccess(header, response.entry);
            } else if (response.entry == null) {
                this.writeNotExist(header);
            } else {
                this.writeNotExecuted(header, response.entry);
            }
        }
        finally {
            span.complete();
        }
    }

    void replace(HotRodHeader header, Subject subject, byte[] key, byte[] value, Metadata.Builder metadata) {
        HotRodServer.ExtendedCacheInfo cacheInfo = this.server.getCacheInfo(header);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, header, subject);
        InfinispanSpan span = this.requestStart(header, cacheInfo.getInfinispanSpanAttributes());
        try (SafeAutoClosable ignored = span.makeCurrent();){
            metadata.version((EntryVersion)cacheInfo.versionGenerator.generateNew());
            this.replaceInternal(header, cache, key, value, metadata.build(), span);
        }
    }

    private void replaceInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] key, byte[] value, Metadata metadata, InfinispanSpan<CacheEntry<?, ?>> span) {
        ((CompletableFuture)cache.withFlags(Flag.SKIP_LISTENER_NOTIFICATION).getAsync((Object)key).thenCompose(prev -> this.replaceAfterGet(cache, prev != null, key, value, metadata))).whenComplete((cacheEntry, throwable) -> this.handleReplaceIfExists(header, (CacheEntry<byte[], byte[]>)cacheEntry, (Throwable)throwable, span));
    }

    private CompletionStage<CacheEntry<byte[], byte[]>> replaceAfterGet(AdvancedCache<byte[], byte[]> cache, boolean exists, byte[] key, byte[] value, Metadata metadata) {
        return exists ? cache.replaceAsyncEntry((Object)key, (Object)value, metadata) : CompletableFutures.completedNull();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleReplaceIfExists(HotRodHeader header, CacheEntry<byte[], byte[]> cacheEntry, Throwable throwable, InfinispanSpan<CacheEntry<?, ?>> span) {
        try {
            if (throwable != null) {
                this.writeException(header, throwable);
                span.recordException(throwable);
            } else if (cacheEntry == null) {
                this.writeNotExecuted(header);
            } else {
                this.writeSuccess(header, cacheEntry);
            }
        }
        finally {
            span.complete();
        }
    }

    void putIfAbsent(HotRodHeader header, Subject subject, byte[] key, byte[] value, Metadata.Builder metadata) {
        HotRodServer.ExtendedCacheInfo cacheInfo = this.server.getCacheInfo(header);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, header, subject);
        InfinispanSpan span = this.requestStart(header, cacheInfo.getInfinispanSpanAttributes());
        try (SafeAutoClosable ignored = span.makeCurrent();){
            metadata.version((EntryVersion)cacheInfo.versionGenerator.generateNew());
            this.putIfAbsentInternal(header, cache, key, value, metadata.build(), span);
        }
    }

    private void putIfAbsentInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] key, byte[] value, Metadata metadata, InfinispanSpan<CacheEntry<?, ?>> span) {
        cache.putIfAbsentAsyncEntry((Object)key, (Object)value, metadata).whenComplete((prev, throwable) -> this.handlePutIfAbsent(header, (CacheEntry<byte[], byte[]>)prev, (Throwable)throwable, span));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handlePutIfAbsent(HotRodHeader header, CacheEntry<byte[], byte[]> result, Throwable throwable, InfinispanSpan<CacheEntry<?, ?>> span) {
        try {
            if (throwable != null) {
                this.writeException(header, throwable);
                span.recordException(throwable);
            } else if (result == null) {
                this.writeSuccess(header);
            } else {
                this.writeNotExecuted(header, result);
            }
        }
        finally {
            span.complete();
        }
    }

    void remove(HotRodHeader header, Subject subject, byte[] key) {
        HotRodServer.ExtendedCacheInfo cacheInfo = this.server.getCacheInfo(header);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, header, subject);
        InfinispanSpan span = this.requestStart(header, cacheInfo.getInfinispanSpanAttributes());
        try (SafeAutoClosable ignored = span.makeCurrent();){
            this.removeInternal(header, cache, key, span);
        }
    }

    private void removeInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] key, InfinispanSpan<CacheEntry<?, ?>> span) {
        cache.removeAsyncEntry((Object)key).whenComplete((ce, throwable) -> this.handleRemove(header, (CacheEntry<byte[], byte[]>)ce, (Throwable)throwable, span));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleRemove(HotRodHeader header, CacheEntry<byte[], byte[]> ce, Throwable throwable, InfinispanSpan<CacheEntry<?, ?>> span) {
        try {
            if (throwable != null) {
                this.writeException(header, throwable);
                span.recordException(throwable);
            } else if (ce != null) {
                this.writeSuccess(header, ce);
            } else {
                this.writeNotExist(header);
            }
        }
        finally {
            span.complete();
        }
    }

    void removeIfUnmodified(HotRodHeader header, Subject subject, byte[] key, long version) {
        HotRodServer.ExtendedCacheInfo cacheInfo = this.server.getCacheInfo(header);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, header, subject);
        InfinispanSpan span = this.requestStart(header, cacheInfo.getInfinispanSpanAttributes());
        try (SafeAutoClosable ignored = span.makeCurrent();){
            this.removeIfUnmodifiedInternal(header, cache, key, version, span);
        }
    }

    private void removeIfUnmodifiedInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] key, long version, InfinispanSpan<ConditionalResponse> span) {
        ((CompletableFuture)cache.getCacheEntryAsync((Object)key).thenCompose(cacheEntry -> this.removeIfUnmodifiedAfterGet(cache, (CacheEntry<byte[], byte[]>)cacheEntry, version))).whenComplete((response, throwable) -> this.handleConditionalResponse(header, (ConditionalResponse)response, (Throwable)throwable, span));
    }

    private CompletionStage<ConditionalResponse> removeIfUnmodifiedAfterGet(AdvancedCache<byte[], byte[]> cache, CacheEntry<byte[], byte[]> entry, long version) {
        if (entry == null) {
            return CompletableFuture.completedFuture(new ConditionalResponse(false, null));
        }
        NumericVersion streamVersion = new NumericVersion(version);
        if (!streamVersion.equals((Object)entry.getMetadata().version())) {
            return CompletableFuture.completedFuture(new ConditionalResponse(false, entry));
        }
        return cache.removeAsync(entry.getKey(), entry.getValue()).thenApply(removed -> new ConditionalResponse((boolean)removed, entry));
    }

    void clear(HotRodHeader header, Subject subject) {
        HotRodServer.ExtendedCacheInfo cacheInfo = this.server.getCacheInfo(header);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, header, subject);
        InfinispanSpan span = this.requestStart(header, cacheInfo.getInfinispanSpanAttributes());
        try (SafeAutoClosable ignored = span.makeCurrent();){
            this.clearInternal(header, cache, span);
        }
    }

    private void clearInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, InfinispanSpan<Void> span) {
        cache.clearAsync().whenComplete((unused, throwable) -> this.handleGenericResponse(header, (Throwable)throwable, span));
    }

    void putAll(HotRodHeader header, Subject subject, Map<byte[], byte[]> entries, Metadata.Builder metadata) {
        HotRodServer.ExtendedCacheInfo cacheInfo = this.server.getCacheInfo(header);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, header, subject);
        InfinispanSpan span = this.requestStart(header, cacheInfo.getInfinispanSpanAttributes());
        try (SafeAutoClosable ignored = span.makeCurrent();){
            metadata.version((EntryVersion)cacheInfo.versionGenerator.generateNew());
            this.putAllInternal(header, cache, entries, metadata.build(), span);
        }
    }

    private void putAllInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, Map<byte[], byte[]> entries, Metadata metadata, InfinispanSpan<Void> span) {
        cache.putAllAsync(entries, metadata).whenComplete((nil, throwable) -> this.handleGenericResponse(header, (Throwable)throwable, span));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleGenericResponse(HotRodHeader header, Throwable throwable, InfinispanSpan<Void> span) {
        try {
            if (throwable != null) {
                this.writeException(header, throwable);
                span.recordException(throwable);
            } else {
                this.writeSuccess(header);
            }
        }
        finally {
            span.complete();
        }
    }

    void getAll(HotRodHeader header, Subject subject, Set<?> keys) {
        HotRodServer.ExtendedCacheInfo cacheInfo = this.server.getCacheInfo(header);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, header, subject);
        this.getAllInternal(header, cache, keys);
    }

    private void getAllInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, Set<?> keys) {
        cache.getAllAsync(keys).whenComplete((map, throwable) -> this.handleGetAll(header, (Map<byte[], byte[]>)map, (Throwable)throwable));
    }

    private void handleGetAll(HotRodHeader header, Map<byte[], byte[]> map, Throwable throwable) {
        if (throwable != null) {
            this.writeException(header, throwable);
        } else {
            this.writeResponse(header, header.encoder().getAllResponse(header, this.server, this.channel, map));
        }
    }

    void size(HotRodHeader header, Subject subject) {
        HotRodServer.ExtendedCacheInfo cacheInfo = this.server.getCacheInfo(header);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, header, subject);
        InfinispanSpan span = this.requestStart(header, cacheInfo.getInfinispanSpanAttributes());
        try (SafeAutoClosable ignored = span.makeCurrent();){
            this.sizeInternal(header, cache, span);
        }
    }

    private void sizeInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, InfinispanSpan<Long> span) {
        cache.sizeAsync().whenComplete((size, throwable) -> this.handleSize(header, (Long)size, (Throwable)throwable, span));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleSize(HotRodHeader header, Long size, Throwable throwable, InfinispanSpan<Long> span) {
        try {
            if (throwable != null) {
                this.writeException(header, throwable);
                span.recordException(throwable);
            } else {
                this.writeResponse(header, header.encoder().unsignedLongResponse(header, this.server, this.channel, size));
            }
        }
        finally {
            span.complete();
        }
    }

    void bulkGet(HotRodHeader header, Subject subject, int size) {
        AdvancedCache<byte[], byte[]> cache = this.server.cache(this.server.getCacheInfo(header), header, subject);
        this.executor.execute(() -> this.bulkGetInternal(header, cache, size));
    }

    private void bulkGetInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, int size) {
        try {
            if (log.isTraceEnabled()) {
                log.tracef("About to create bulk response count = %d", size);
            }
            this.writeResponse(header, header.encoder().bulkGetResponse(header, this.server, this.channel, size, (CacheSet<Map.Entry<byte[], byte[]>>)cache.entrySet()));
        }
        catch (Throwable t) {
            this.writeException(header, t);
        }
    }

    void bulkGetKeys(HotRodHeader header, Subject subject, int scope) {
        AdvancedCache<byte[], byte[]> cache = this.server.cache(this.server.getCacheInfo(header), header, subject);
        this.executor.execute(() -> this.bulkGetKeysInternal(header, cache, scope));
    }

    private void bulkGetKeysInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, int scope) {
        try {
            if (log.isTraceEnabled()) {
                log.tracef("About to create bulk get keys response scope = %d", scope);
            }
            this.writeResponse(header, header.encoder().bulkGetKeysResponse(header, this.server, this.channel, (CloseableIterator<byte[]>)cache.keySet().iterator()));
        }
        catch (Throwable t) {
            this.writeException(header, t);
        }
    }

    void query(HotRodHeader header, Subject subject, byte[] queryBytes) {
        AdvancedCache<byte[], byte[]> cache = this.server.cache(this.server.getCacheInfo(header), header, subject);
        this.executor.execute(() -> this.queryInternal(header, cache, queryBytes));
    }

    private void queryInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] queryBytes) {
        try {
            byte[] queryResult = this.server.query(cache, queryBytes);
            this.writeResponse(header, header.encoder().valueResponse(header, this.server, this.channel, OperationStatus.Success, queryResult));
        }
        catch (Throwable t) {
            this.writeException(header, t);
        }
    }

    void addClientListener(HotRodHeader header, Subject subject, byte[] listenerId, boolean includeCurrentState, String filterFactory, List<byte[]> filterParams, String converterFactory, List<byte[]> converterParams, boolean useRawData, int listenerInterests, int bloomBits) {
        HotRodServer.ExtendedCacheInfo cacheInfo = this.server.getCacheInfo(header);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, header, subject);
        InfinispanSpan span = this.requestStart(header, cacheInfo.getInfinispanSpanAttributes());
        try (SafeAutoClosable ignored = span.makeCurrent();){
            BloomFilter bloomFilter = null;
            if (bloomBits > 0) {
                bloomFilter = MurmurHash3BloomFilter.createConcurrentFilter((int)bloomBits);
                if (log.isTraceEnabled()) {
                    log.tracef("Installing bloom filter for listener %s on cache %s", Util.toStr((Object)listenerId), header.cacheName);
                }
                BloomFilter<byte[]> priorFilter = this.bloomFilters.putIfAbsent(header.cacheName, (BloomFilter<byte[]>)bloomFilter);
                assert (priorFilter == null);
            }
            CompletionStage<Void> stage = this.listenerRegistry.addClientListener(this.channel, header, listenerId, cache, includeCurrentState, filterFactory, filterParams, converterFactory, converterParams, useRawData, listenerInterests, (BloomFilter<byte[]>)bloomFilter);
            stage.whenComplete((ignore, cause) -> {
                try {
                    if (cause != null) {
                        log.trace("Failed to add listener", (Throwable)cause);
                        if (cause instanceof CompletionException) {
                            this.writeException(header, cause.getCause());
                        } else {
                            this.writeException(header, (Throwable)cause);
                        }
                        span.recordException(cause);
                    } else {
                        this.writeSuccess(header);
                    }
                }
                finally {
                    span.complete();
                }
            });
        }
    }

    void removeClientListener(HotRodHeader header, Subject subject, byte[] listenerId) {
        HotRodServer.ExtendedCacheInfo cacheInfo = this.server.getCacheInfo(header);
        AdvancedCache<byte[], byte[]> cache = this.server.cache(cacheInfo, header, subject);
        InfinispanSpan span = this.requestStart(header, cacheInfo.getInfinispanSpanAttributes());
        try (SafeAutoClosable ignored = span.makeCurrent();){
            this.removeClientListenerInternal(header, cache, listenerId, span);
        }
    }

    private void removeClientListenerInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] listenerId, InfinispanSpan<Boolean> span) {
        this.server.getClientListenerRegistry().removeClientListener(listenerId, (Cache)cache).whenComplete((success, throwable) -> {
            try {
                if (throwable != null) {
                    this.writeException(header, (Throwable)throwable);
                    span.recordException(throwable);
                } else if (success == Boolean.TRUE) {
                    this.writeSuccess(header);
                } else {
                    this.writeNotExecuted(header);
                }
            }
            finally {
                span.complete();
            }
        });
    }

    void iterationStart(HotRodHeader header, Subject subject, byte[] segmentMask, String filterConverterFactory, List<byte[]> filterConverterParams, int batch, boolean includeMetadata) {
        AdvancedCache<byte[], byte[]> cache = this.server.cache(this.server.getCacheInfo(header), header, subject);
        this.executor.execute(() -> {
            try {
                IterationState iterationState = this.server.getIterationManager().start(cache, segmentMask != null ? BitSet.valueOf(segmentMask) : null, filterConverterFactory, filterConverterParams, header.getValueMediaType(), batch, includeMetadata, DeliveryGuarantee.EXACTLY_ONCE, null);
                iterationState.getReaper().registerChannel(this.channel);
                this.writeResponse(header, header.encoder().iterationStartResponse(header, this.server, this.channel, iterationState.getId()));
            }
            catch (Throwable t) {
                this.writeException(header, t);
            }
        });
    }

    void iterationNext(HotRodHeader header, Subject subject, String iterationId) {
        this.executor.execute(() -> {
            try {
                IterableIterationResult iterationResult = this.server.getIterationManager().next(iterationId, -1);
                this.writeResponse(header, header.encoder().iterationNextResponse(header, this.server, this.channel, iterationResult));
            }
            catch (Throwable t) {
                this.writeException(header, t);
            }
        });
    }

    void iterationEnd(HotRodHeader header, Subject subject, String iterationId) {
        this.executor.execute(() -> {
            try {
                IterationState removed = this.server.getIterationManager().close(iterationId);
                this.writeResponse(header, header.encoder().emptyResponse(header, this.server, this.channel, removed != null ? OperationStatus.Success : OperationStatus.InvalidIteration));
            }
            catch (Throwable t) {
                this.writeException(header, t);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void putStream(HotRodHeader header, Subject subject, byte[] key, ByteBuf buf, long version, Metadata.Builder metadata) {
        try {
            byte[] value = new byte[buf.readableBytes()];
            buf.readBytes(value);
            this.putStream(header, subject, key, value, version, metadata);
        }
        finally {
            buf.release();
        }
    }

    private void putStream(HotRodHeader header, Subject subject, byte[] key, byte[] value, long version, Metadata.Builder metadata) {
        if (version == 0L) {
            this.put(header, subject, key, value, metadata);
        } else if (version < 0L) {
            this.putIfAbsent(header, subject, key, value, metadata);
        } else {
            this.replaceIfUnmodified(header, subject, key, version, value, metadata);
        }
    }

    private <T> InfinispanSpan<T> requestStart(HotRodHeader header, InfinispanSpanAttributes spanAttributes) {
        return this.telemetryService.startTraceRequest(header.op.name(), spanAttributes, (InfinispanSpanContext)header);
    }

    public void putStreamStart(HotRodHeader header, Subject subject, byte[] key, Metadata.Builder metadata, long version) {
        try {
            int id = this.server.getStreamingManager().startPutStream(key, this.channel, metadata, version);
            this.writeResponse(header, header.encoder().putStreamStartResponse(header, this.server, this.channel, id));
        }
        catch (Throwable t) {
            this.writeException(header, t);
        }
    }

    public void putStreamNext(HotRodHeader header, Subject subject, int streamId, boolean lastChunk, ByteBuf chunk) {
        log.tracef("Received chunk for streamId %s", streamId);
        try {
            StreamingState state = this.server.getStreamingManager().nextPutStream(streamId, lastChunk, chunk);
            if (state == null) {
                this.writeException(header, new StreamCorruptedException("Iteration " + streamId + " is not present on the server"));
            } else if (lastChunk) {
                this.putStream(header, subject, state.getKey(), state.valueForPut(), state.versionForPut(), state.metadataForPut());
            } else {
                this.writeSuccess(header);
            }
        }
        catch (Throwable t) {
            this.writeException(header, t);
        }
    }

    public void putStreamEnd(HotRodHeader header, Subject subject, int streamId) {
        try {
            this.server.getStreamingManager().closePutStream(streamId);
            this.writeSuccess(header);
        }
        catch (Throwable t) {
            this.writeException(header, t);
        }
    }

    public void getStreamStart(HotRodHeader header, Subject subject, byte[] key, int chunkSize) {
        AdvancedCache<byte[], byte[]> cache = this.server.cache(this.server.getCacheInfo(header), header, subject);
        try {
            cache.getAdvancedCache().getCacheEntryAsync((Object)key).whenComplete((entry, t) -> {
                if (t != null) {
                    this.writeException(header, (Throwable)t);
                    return;
                }
                if (entry == null) {
                    this.writeNotExist(header);
                    return;
                }
                GetStreamResponse gsr = this.server.getStreamingManager().startGetStream(key, (byte[])entry.getValue(), this.channel, chunkSize);
                this.writeResponse(header, header.encoder().getStreamStartResponse(header, this.server, this.channel, (CacheEntry<?, ?>)entry, gsr));
            });
        }
        catch (Throwable t2) {
            this.writeException(header, t2);
        }
    }

    public void getStreamNext(HotRodHeader header, Subject subject, int streamId) {
        try {
            GetStreamResponse gsr = this.server.getStreamingManager().nextGetStream(streamId);
            if (gsr == null) {
                this.writeException(header, new StreamCorruptedException("StreamId " + streamId + " is not present on the server"));
                return;
            }
            this.writeResponse(header, header.encoder().getStreamStartResponse(header, this.server, this.channel, null, gsr));
        }
        catch (Throwable t) {
            this.writeException(header, t);
        }
    }

    public void getStreamEnd(HotRodHeader header, Subject subject, int streamId) {
        try {
            this.server.getStreamingManager().closeGetStream(streamId);
            this.writeSuccess(header);
        }
        catch (Throwable t) {
            this.writeException(header, t);
        }
    }

    private static class ConditionalResponse {
        final boolean result;
        final CacheEntry<byte[], byte[]> entry;

        ConditionalResponse(boolean result, CacheEntry<byte[], byte[]> entry) {
            this.result = result;
            this.entry = entry;
        }
    }
}

