/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.impl;

import io.netty.channel.Channel;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.infinispan.api.async.AsyncCacheEntryProcessor;
import org.infinispan.api.common.CacheEntry;
import org.infinispan.api.common.CacheEntryVersion;
import org.infinispan.api.common.CacheOptions;
import org.infinispan.api.common.CacheWriteOptions;
import org.infinispan.api.common.events.cache.CacheEntryEvent;
import org.infinispan.api.common.events.cache.CacheEntryEventType;
import org.infinispan.api.common.events.cache.CacheListenerOptions;
import org.infinispan.api.common.process.CacheEntryProcessorResult;
import org.infinispan.api.common.process.CacheProcessorOptions;
import org.infinispan.api.configuration.CacheConfiguration;
import org.infinispan.client.hotrod.CacheTopologyInfo;
import org.infinispan.client.hotrod.DataFormat;
import org.infinispan.client.hotrod.Flag;
import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.ProtocolVersion;
import org.infinispan.client.hotrod.RemoteCacheContainer;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.ServerStatistics;
import org.infinispan.client.hotrod.StreamingRemoteCache;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.StatisticsConfiguration;
import org.infinispan.client.hotrod.event.impl.ClientListenerNotifier;
import org.infinispan.client.hotrod.exceptions.RemoteCacheManagerNotStartedException;
import org.infinispan.client.hotrod.filter.Filters;
import org.infinispan.client.hotrod.impl.CacheEntryConversion;
import org.infinispan.client.hotrod.impl.CacheOptionsUtil;
import org.infinispan.client.hotrod.impl.ClientStatistics;
import org.infinispan.client.hotrod.impl.InternalRemoteCache;
import org.infinispan.client.hotrod.impl.PrivateHotRodFlag;
import org.infinispan.client.hotrod.impl.RemoteCacheEntrySet;
import org.infinispan.client.hotrod.impl.RemoteCacheKeySet;
import org.infinispan.client.hotrod.impl.RemoteCacheSupport;
import org.infinispan.client.hotrod.impl.RemoteCacheValuesCollection;
import org.infinispan.client.hotrod.impl.StreamingRemoteCacheImpl;
import org.infinispan.client.hotrod.impl.VersionedOperationResponse;
import org.infinispan.client.hotrod.impl.cache.CacheEntryImpl;
import org.infinispan.client.hotrod.impl.cache.CacheEntryMetadataImpl;
import org.infinispan.client.hotrod.impl.cache.CacheEntryVersionImpl;
import org.infinispan.client.hotrod.impl.iteration.RemotePublisher;
import org.infinispan.client.hotrod.impl.operations.AddClientListenerOperation;
import org.infinispan.client.hotrod.impl.operations.AdvancedHotRodOperation;
import org.infinispan.client.hotrod.impl.operations.CacheOperationsFactory;
import org.infinispan.client.hotrod.impl.operations.GetAllBulkOperation;
import org.infinispan.client.hotrod.impl.operations.GetWithMetadataOperation;
import org.infinispan.client.hotrod.impl.operations.HotRodOperation;
import org.infinispan.client.hotrod.impl.operations.PingResponse;
import org.infinispan.client.hotrod.impl.operations.PutAllBulkOperation;
import org.infinispan.client.hotrod.impl.query.RemoteQueryFactory;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelRecord;
import org.infinispan.client.hotrod.impl.transport.netty.OperationDispatcher;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.client.hotrod.near.NearCacheService;
import org.infinispan.commons.api.query.ContinuousQuery;
import org.infinispan.commons.api.query.Query;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.CloseableIteratorCollection;
import org.infinispan.commons.util.CloseableIteratorSet;
import org.infinispan.commons.util.Closeables;
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IteratorMapper;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;

public class RemoteCacheImpl<K, V>
extends RemoteCacheSupport<K, V>
implements InternalRemoteCache<K, V> {
    private static final Log log = LogFactory.getLog(RemoteCacheImpl.class, Log.class);
    protected final String name;
    protected final byte[] nameBytes;
    protected final RemoteCacheManager remoteCacheManager;
    protected final CacheOperationsFactory operationsFactory;
    protected final ClientListenerNotifier clientListenerNotifier;
    protected final int flagInt;
    protected final RemoteQueryFactory queryFactory;
    protected int batchSize;
    protected DataFormat dataFormat;
    protected ClientStatistics clientStatistics;
    protected ObjectName mbeanObjectName;

    public RemoteCacheImpl(RemoteCacheManager rcm, String name, TimeService timeService, Function<InternalRemoteCache<K, V>, CacheOperationsFactory> factoryFunction) {
        this(rcm, name, timeService, null, factoryFunction);
    }

    public RemoteCacheImpl(RemoteCacheManager rcm, String name, TimeService timeService, NearCacheService<K, V> nearCacheService, Function<InternalRemoteCache<K, V>, CacheOperationsFactory> factoryFunction) {
        if (log.isTraceEnabled()) {
            log.tracef("Creating remote cache: %s", name);
        }
        this.name = name;
        this.nameBytes = name.getBytes(StandardCharsets.UTF_8);
        this.remoteCacheManager = rcm;
        this.dataFormat = DataFormat.builder().build();
        this.clientStatistics = new ClientStatistics(timeService, nearCacheService, rcm.getConfiguration().metricRegistry().withCache(name));
        this.operationsFactory = factoryFunction.apply(this);
        this.clientListenerNotifier = rcm.getListenerNotifier();
        this.flagInt = rcm.getConfiguration().forceReturnValues() ? Flag.FORCE_RETURN_VALUE.getFlagInt() : 0;
        this.queryFactory = new RemoteQueryFactory(this);
    }

    protected RemoteCacheImpl(RemoteCacheImpl<?, ?> other, int flagInt) {
        if (log.isTraceEnabled()) {
            log.tracef("Creating remote cache: %s with flags %d", other.name, flagInt);
        }
        this.name = other.name;
        this.nameBytes = other.nameBytes;
        this.remoteCacheManager = other.remoteCacheManager;
        this.dataFormat = other.dataFormat;
        this.clientStatistics = other.clientStatistics;
        this.operationsFactory = other.operationsFactory.newFactoryFor(this);
        this.flagInt = flagInt;
        this.clientListenerNotifier = other.clientListenerNotifier;
        this.batchSize = other.batchSize;
        this.dispatcher = other.dispatcher;
        this.queryFactory = new RemoteQueryFactory(this);
    }

    @Override
    public void init(Configuration configuration, OperationDispatcher dispatcher, ObjectName jmxParent) {
        this.init(configuration, dispatcher);
        if (jmxParent != null) {
            this.registerMBean(jmxParent);
        }
    }

    @Override
    public OperationDispatcher getDispatcher() {
        return this.dispatcher;
    }

    @Override
    public void init(Configuration configuration, OperationDispatcher dispatcher) {
        this.init(configuration.batchSize(), dispatcher);
    }

    private void init(int batchSize, OperationDispatcher dispatcher) {
        this.batchSize = batchSize;
        this.dispatcher = dispatcher;
    }

    private void registerMBean(ObjectName jmxParent) {
        StatisticsConfiguration configuration = this.getRemoteCacheContainer().getConfiguration().statistics();
        if (configuration.jmxEnabled()) {
            try {
                MBeanServer mbeanServer = configuration.mbeanServerLookup().getMBeanServer();
                String cacheName = this.name.isEmpty() ? "org.infinispan.default" : this.name;
                this.mbeanObjectName = new ObjectName(String.format("%s:type=HotRodClient,name=%s,cache=%s", jmxParent.getDomain(), configuration.jmxName(), cacheName));
                mbeanServer.registerMBean(this.clientStatistics, this.mbeanObjectName);
            }
            catch (Exception e) {
                throw Log.HOTROD.jmxRegistrationFailure(e);
            }
        }
    }

    private synchronized void unregisterMBean() {
        if (this.mbeanObjectName != null) {
            try {
                MBeanServer mBeanServer = this.getRemoteCacheContainer().getConfiguration().statistics().mbeanServerLookup().getMBeanServer();
                if (mBeanServer.isRegistered(this.mbeanObjectName)) {
                    mBeanServer.unregisterMBean(this.mbeanObjectName);
                } else {
                    Log.HOTROD.debugf("MBean not registered: %s", this.mbeanObjectName);
                }
                this.mbeanObjectName = null;
            }
            catch (Exception e) {
                throw Log.HOTROD.jmxUnregistrationFailure(e);
            }
        }
    }

    @Override
    public RemoteCacheContainer getRemoteCacheContainer() {
        return this.remoteCacheManager;
    }

    @Override
    public CompletableFuture<Boolean> removeWithVersionAsync(K key, long version) {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation op = this.operationsFactory.newRemoveIfUnmodifiedOperation(key, version);
        return this.dispatcher.execute(op).thenApply(response -> response.getCode().isUpdated()).toCompletableFuture();
    }

    @Override
    public CompletableFuture<V> mergeAsync(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
        throw new UnsupportedOperationException();
    }

    @Override
    public CompletableFuture<Boolean> replaceWithVersionAsync(K key, V newValue, long version, long lifespan, TimeUnit lifespanTimeUnit, long maxIdle, TimeUnit maxIdleTimeUnit) {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation<VersionedOperationResponse<V>> op = this.operationsFactory.newReplaceIfUnmodifiedOperation(key, newValue, lifespan, lifespanTimeUnit, maxIdle, maxIdleTimeUnit, version);
        return this.dispatcher.execute(op).thenApply(response -> response.getCode().isUpdated()).toCompletableFuture();
    }

    @Override
    public CloseableIterator<Map.Entry<Object, Object>> retrieveEntries(String filterConverterFactory, Object[] filterConverterParams, Set<Integer> segments, int batchSize) {
        Publisher remotePublisher = this.publishEntries(filterConverterFactory, filterConverterParams, segments, batchSize);
        return Closeables.iterator(remotePublisher, (int)batchSize);
    }

    @Override
    public <E> Publisher<Map.Entry<K, E>> publishEntries(String filterConverterFactory, Object[] filterConverterParams, Set<Integer> segments, int batchSize) {
        this.assertRemoteCacheManagerIsStarted();
        if (segments != null && segments.isEmpty()) {
            return Flowable.empty();
        }
        return new RemotePublisher(this.operationsFactory, this.dispatcher, filterConverterFactory, filterConverterParams, segments, batchSize, false, this.dataFormat);
    }

    @Override
    public CloseableIterator<Map.Entry<Object, Object>> retrieveEntriesByQuery(Query<?> filterQuery, Set<Integer> segments, int batchSize) {
        Publisher remotePublisher = this.publishEntriesByQuery(filterQuery, segments, batchSize);
        return Closeables.iterator(remotePublisher, (int)batchSize);
    }

    @Override
    public <E> Publisher<Map.Entry<K, E>> publishEntriesByQuery(Query<?> filterQuery, Set<Integer> segments, int batchSize) {
        Object[] factoryParams = Filters.makeFactoryParams(filterQuery);
        return this.publishEntries("iteration-filter-converter-factory", factoryParams, segments, batchSize);
    }

    @Override
    public CloseableIterator<Map.Entry<Object, MetadataValue<Object>>> retrieveEntriesWithMetadata(Set<Integer> segments, int batchSize) {
        Publisher<Map.Entry<K, MetadataValue<V>>> remotePublisher = this.publishEntriesWithMetadata(segments, batchSize);
        return Closeables.iterator(remotePublisher, (int)batchSize);
    }

    @Override
    public Publisher<Map.Entry<K, MetadataValue<V>>> publishEntriesWithMetadata(Set<Integer> segments, int batchSize) {
        return new RemotePublisher(this.operationsFactory, this.dispatcher, null, null, segments, batchSize, true, this.dataFormat);
    }

    @Override
    public CompletableFuture<MetadataValue<V>> getWithMetadataAsync(K key) {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation op = this.operationsFactory.newGetWithMetadataOperation(key, null);
        return this.dispatcher.execute(op).thenApply(GetWithMetadataOperation.GetWithMetadataResult::value).toCompletableFuture();
    }

    @Override
    public CompletionStage<GetWithMetadataOperation.GetWithMetadataResult<V>> getWithMetadataAsync(K key, Channel channel) {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation op = this.operationsFactory.newGetWithMetadataOperation(key, channel);
        return channel != null ? this.dispatcher.executeOnSingleAddress(op, ChannelRecord.of(channel)) : this.dispatcher.execute(op);
    }

    @Override
    public CompletableFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
        this.assertRemoteCacheManagerIsStarted();
        if (log.isTraceEnabled()) {
            log.tracef("About to putAll entries (%s) lifespan:%d (%s), maxIdle:%d (%s)", new Object[]{map, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit});
        }
        PutAllBulkOperation op = new PutAllBulkOperation(map, this.dataFormat, serialized -> this.operationsFactory.newPutAllBytesOperation((Map<byte[], byte[]>)serialized, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit));
        return this.dispatcher.executeBulk(this.name, op).toCompletableFuture();
    }

    @Override
    public CompletableFuture<Long> sizeAsync() {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation<Integer> op = this.operationsFactory.newSizeOperation();
        return this.dispatcher.execute(op).toCompletableFuture().thenApply(Integer::longValue);
    }

    public boolean isEmpty() {
        return this.size() == 0;
    }

    @Override
    public ClientStatistics clientStatistics() {
        return this.clientStatistics;
    }

    @Override
    public ServerStatistics serverStatistics() {
        return this.dispatcher.await(this.serverStatisticsAsync());
    }

    @Override
    public CompletionStage<ServerStatistics> serverStatisticsAsync() {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation<ServerStatistics> op = this.operationsFactory.newStatsOperation();
        return this.dispatcher.execute(op).toCompletableFuture();
    }

    @Override
    public CompletableFuture<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
        this.assertRemoteCacheManagerIsStarted();
        if (log.isTraceEnabled()) {
            log.tracef("About to add (K,V): (%s, %s) lifespan:%d, maxIdle:%d", new Object[]{key, value, lifespan, maxIdleTime});
        }
        HotRodOperation<MetadataValue<V>> op = this.operationsFactory.newPutKeyValueOperation(key, value, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
        return this.dispatcher.execute(op).thenApply(CacheEntryConversion.extractValue()).toCompletableFuture();
    }

    public CompletableFuture<Void> clearAsync() {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation<Void> op = this.operationsFactory.newClearOperation();
        return this.dispatcher.execute(op).toCompletableFuture();
    }

    @Override
    public CompletableFuture<V> computeAsync(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
        CompletableFuture<MetadataValue<V>> cf = this.getWithMetadataAsync(key);
        return cf.thenCompose(metadataValue -> {
            long version;
            Object oldValue;
            if (metadataValue != null) {
                oldValue = metadataValue.getValue();
                version = metadataValue.getVersion();
            } else {
                oldValue = null;
                version = -1L;
            }
            Object newValue = remappingFunction.apply((K)key, (V)oldValue);
            CompletionStage<Boolean> doneStage = newValue != null ? (oldValue != null ? this.replaceWithVersionAsync(key, newValue, version, lifespan, lifespanUnit, maxIdle, maxIdleUnit) : this.putIfAbsentAsync(key, newValue, lifespan, lifespanUnit, maxIdle, maxIdleUnit, Flag.FORCE_RETURN_VALUE).thenApply(Objects::isNull)) : (oldValue != null ? this.removeWithVersionAsync(key, version) : CompletableFuture.completedFuture(Boolean.TRUE));
            return doneStage.thenCompose(done -> {
                if (done.booleanValue()) {
                    return CompletableFuture.completedFuture(newValue);
                }
                return this.computeAsync(key, remappingFunction, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
            });
        });
    }

    @Override
    public CompletableFuture<V> computeIfAbsentAsync(K key, Function<? super K, ? extends V> mappingFunction, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
        CompletableFuture<V> cf = this.getAsync((Object)key);
        return cf.thenCompose(oldValue -> {
            if (oldValue != null) {
                return CompletableFuture.completedFuture(oldValue);
            }
            Object newValue = mappingFunction.apply((K)key);
            if (newValue == null) {
                return CompletableFutures.completedNull();
            }
            return this.putIfAbsentAsync(key, newValue, lifespan, lifespanUnit, maxIdle, maxIdleUnit).thenApply(v -> v == null ? newValue : v);
        });
    }

    @Override
    public CompletableFuture<V> computeIfPresentAsync(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
        CompletableFuture<MetadataValue<V>> cf = this.getWithMetadataAsync(key);
        return cf.thenCompose(metadata -> {
            if (metadata == null || metadata.getValue() == null) {
                return CompletableFutures.completedNull();
            }
            Object newValue = remappingFunction.apply((K)key, (V)metadata.getValue());
            CompletableFuture<Boolean> done = newValue == null ? this.removeWithVersionAsync(key, metadata.getVersion()) : this.replaceWithVersionAsync(key, newValue, metadata.getVersion(), lifespan, lifespanUnit, maxIdle, maxIdleUnit);
            return done.thenCompose(success -> {
                if (success.booleanValue()) {
                    return CompletableFuture.completedFuture(newValue);
                }
                return this.computeIfPresentAsync(key, remappingFunction, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
            });
        });
    }

    @Override
    public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
        throw new UnsupportedOperationException();
    }

    @Override
    public CompletableFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation<MetadataValue<V>> op = this.operationsFactory.newPutIfAbsentOperation(key, value, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
        return this.dispatcher.execute(op).thenApply(CacheEntryConversion.extractValue()).toCompletableFuture();
    }

    private CompletableFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag ... flags) {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation<MetadataValue<V>> op = this.operationsFactory.newPutIfAbsentOperation(key, value, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit, flags);
        return this.dispatcher.execute(op).thenApply(CacheEntryConversion.extractValue()).toCompletableFuture();
    }

    @Override
    public CompletableFuture<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
        Objects.requireNonNull(oldValue);
        Objects.requireNonNull(newValue);
        CompletableFuture<MetadataValue<V>> stage = this.getWithMetadataAsync(key);
        return stage.thenCompose(metadataValue -> {
            Object prevValue;
            if (metadataValue != null && oldValue.equals(prevValue = metadataValue.getValue())) {
                return this.replaceWithVersionAsync(key, newValue, metadataValue.getVersion(), lifespan, lifespanUnit, maxIdle, maxIdleUnit).thenCompose(replaced -> {
                    if (replaced.booleanValue()) {
                        return CompletableFuture.completedFuture(replaced);
                    }
                    return this.replaceAsync(key, oldValue, newValue, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
                });
            }
            return CompletableFuture.completedFuture(Boolean.FALSE);
        }).toCompletableFuture();
    }

    @Override
    public CompletableFuture<V> removeAsync(Object key) {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation removeOperation = this.operationsFactory.newRemoveOperation(key);
        return this.dispatcher.execute(removeOperation).thenApply(CacheEntryConversion.extractValue()).toCompletableFuture();
    }

    @Override
    public CompletableFuture<Boolean> removeAsync(Object key, Object value) {
        Objects.requireNonNull(value);
        CompletableFuture<MetadataValue<V>> stage = this.getWithMetadataAsync(key);
        return stage.thenCompose(metadataValue -> {
            if (metadataValue == null || !value.equals(metadataValue.getValue())) {
                return CompletableFuture.completedFuture(Boolean.FALSE);
            }
            return this.removeWithVersionAsync(key, metadataValue.getVersion()).thenCompose(removed -> {
                if (removed.booleanValue()) {
                    return CompletableFuture.completedFuture(Boolean.TRUE);
                }
                return this.removeAsync(key, value);
            });
        }).toCompletableFuture();
    }

    @Override
    public CompletableFuture<V> replaceAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation<V> op = this.operationsFactory.newReplaceOperation(key, value, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
        return this.dispatcher.execute(op).toCompletableFuture();
    }

    @Override
    public CompletableFuture<Boolean> containsKeyAsync(K key) {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation<Boolean> op = this.operationsFactory.newContainsKeyOperation(key);
        return this.dispatcher.execute(op).toCompletableFuture();
    }

    public boolean containsValue(Object value) {
        Objects.requireNonNull(value);
        return this.values().contains(value);
    }

    @Override
    public CompletableFuture<Map<K, V>> getAllAsync(Set<?> keys) {
        this.assertRemoteCacheManagerIsStarted();
        if (log.isTraceEnabled()) {
            log.tracef("About to getAll entries (%s)", keys);
        }
        GetAllBulkOperation op = new GetAllBulkOperation(keys, this.dataFormat, this.operationsFactory::newGetAllBytesOperation);
        return this.dispatcher.executeBulk(this.name, op).thenApply(Collections::unmodifiableMap).toCompletableFuture();
    }

    public void start() {
        if (log.isDebugEnabled()) {
            log.debugf("Start called, nothing to do here(%s)", this.getName());
        }
    }

    public void stop() {
        this.unregisterMBean();
        this.remoteCacheManager.getConfiguration().metricRegistry().removeCache(this.name);
    }

    public String getName() {
        return this.name;
    }

    @Override
    public byte[] getNameBytes() {
        return this.nameBytes;
    }

    public String getVersion() {
        return RemoteCacheImpl.class.getPackage().getImplementationVersion();
    }

    public <T> Query<T> query(String query) {
        return this.queryFactory.create(query);
    }

    public ContinuousQuery<K, V> continuousQuery() {
        return this.queryFactory.continuousQuery(this);
    }

    @Override
    public String getProtocolVersion() {
        return "HotRod client, protocol version: " + String.valueOf((Object)ProtocolVersion.DEFAULT_PROTOCOL_VERSION);
    }

    @Override
    public void addClientListener(Object listener) {
        this.addClientListener(listener, null, null);
    }

    @Override
    public void addClientListener(Object listener, Object[] filterFactoryParams, Object[] converterFactoryParams) {
        this.assertRemoteCacheManagerIsStarted();
        AddClientListenerOperation op = this.operationsFactory.newAddClientListenerOperation(listener, filterFactoryParams, converterFactoryParams);
        this.dispatcher.await(this.dispatcher.executeAddListener(op));
    }

    @Override
    public Channel addNearCacheListener(Object listener, int bloomBits) {
        throw new UnsupportedOperationException("Adding a near cache listener to a RemoteCache is not supported!");
    }

    @Override
    public void removeClientListener(Object listener) {
        this.assertRemoteCacheManagerIsStarted();
        byte[] listenerId = this.clientListenerNotifier.findListenerId(listener);
        if (listenerId == null) {
            return;
        }
        SocketAddress sa = this.clientListenerNotifier.findAddress(listenerId);
        if (sa == null) {
            return;
        }
        HotRodOperation<Void> op = this.operationsFactory.newRemoveClientListenerOperation(listener);
        CompletionStage removalStage = op.asCompletableFuture().thenAccept(___ -> {
            this.clientListenerNotifier.removeClientListener(listenerId);
            this.dispatcher.removeListener(sa, listenerId);
        });
        this.dispatcher.executeOnSingleAddress(op, sa);
        this.dispatcher.await(removalStage);
    }

    @Override
    public InternalRemoteCache<K, V> withFlags(Flag ... flags) {
        if (flags.length == 0) {
            return this;
        }
        int newFlags = 0;
        for (Flag flag : flags) {
            newFlags |= flag.getFlagInt();
        }
        int resultingFlags = (int)EnumUtil.mergeBitSets((long)this.flagInt, (long)newFlags);
        if (resultingFlags == this.flagInt) {
            return this;
        }
        return this.newInstance(resultingFlags);
    }

    @Override
    public InternalRemoteCache<K, V> noFlags() {
        return this.newInstance(0);
    }

    @Override
    public Set<Flag> flags() {
        return EnumUtil.enumSetOf((long)this.flagInt, Flag.class);
    }

    @Override
    public int flagInt() {
        return this.flagInt;
    }

    @Override
    public CacheOperationsFactory getOperationsFactory() {
        return this.operationsFactory;
    }

    @Override
    public ClientListenerNotifier getListenerNotifier() {
        return this.clientListenerNotifier;
    }

    @Override
    public CompletableFuture<V> getAsync(Object key) {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation op = this.operationsFactory.newGetOperation(key);
        if (log.isTraceEnabled()) {
            op.asCompletableFuture().thenAccept(value -> log.tracef("For key(%s) returning %s", key, value));
        }
        return this.dispatcher.execute(op).toCompletableFuture();
    }

    @Override
    public CompletionStage<PingResponse> ping() {
        return this.dispatcher.execute(this.operationsFactory.newPingOperation()).toCompletableFuture();
    }

    @Override
    public byte[] keyToBytes(Object o) {
        return this.dataFormat.keyToBytes(o);
    }

    protected byte[] valueToBytes(Object o) {
        return this.dataFormat.valueToBytes(o);
    }

    protected void assertRemoteCacheManagerIsStarted() {
        if (!this.remoteCacheManager.isStarted()) {
            String message = "Cannot perform operations on a cache associated with an unstarted RemoteCacheManager. Use RemoteCacheManager.start before using the remote cache.";
            Log.HOTROD.unstartedRemoteCacheManager();
            throw new RemoteCacheManagerNotStartedException(message);
        }
    }

    @Override
    public CloseableIteratorSet<K> keySet(IntSet segments) {
        return new RemoteCacheKeySet(this, segments);
    }

    @Override
    public CloseableIterator<K> keyIterator(IntSet segments) {
        return new IteratorMapper(this.retrieveEntries("org.infinispan.server.hotrod.HotRodServer$ToEmptyBytesKeyValueFilterConverter", (Set<Integer>)segments, this.batchSize), e -> e.getKey());
    }

    @Override
    public CloseableIteratorSet<Map.Entry<K, V>> entrySet(IntSet segments) {
        return new RemoteCacheEntrySet(this, segments);
    }

    @Override
    public CloseableIterator<Map.Entry<K, V>> entryIterator(IntSet segments) {
        return this.castEntryIterator(this.retrieveEntries(null, (Set<Integer>)segments, this.batchSize));
    }

    protected <K, V> CloseableIterator<Map.Entry<K, V>> castEntryIterator(CloseableIterator iterator) {
        return iterator;
    }

    @Override
    public CloseableIteratorCollection<V> values(IntSet segments) {
        return new RemoteCacheValuesCollection(this, segments);
    }

    @Override
    public <T> T execute(String taskName, Map<String, ?> params) {
        return this.execute(taskName, params, null);
    }

    @Override
    public <T> T execute(String taskName, Map<String, ?> params, Object key) {
        this.assertRemoteCacheManagerIsStarted();
        HashMap<String, byte[]> marshalledParams = new HashMap<String, byte[]>();
        if (params != null) {
            for (Map.Entry<String, ?> entry : params.entrySet()) {
                marshalledParams.put(entry.getKey(), this.keyToBytes(entry.getValue()));
            }
        }
        HotRodOperation op = this.operationsFactory.executeOperation(taskName, marshalledParams, key);
        return this.dispatcher.await(this.dispatcher.execute(op));
    }

    @Override
    public CacheTopologyInfo getCacheTopologyInfo() {
        return this.dispatcher.getCacheTopologyInfo(this.name);
    }

    @Override
    public StreamingRemoteCache<K> streaming() {
        this.assertRemoteCacheManagerIsStarted();
        return new StreamingRemoteCacheImpl(this);
    }

    @Override
    public <T, U> InternalRemoteCache<T, U> withDataFormat(DataFormat newDataFormat) {
        Objects.requireNonNull(newDataFormat, "Data Format must not be null").initialize(this.remoteCacheManager, this.name);
        return this.newInstance(newDataFormat);
    }

    protected <T, U> InternalRemoteCache<T, U> newInstance(DataFormat dataFormat) {
        RemoteCacheImpl<K, V> instance = new RemoteCacheImpl<K, V>(this, this.flagInt);
        instance.dataFormat = dataFormat;
        instance.init(this.batchSize, this.dispatcher);
        return instance;
    }

    protected <T, U> InternalRemoteCache<T, U> newInstance(int flags) {
        return new RemoteCacheImpl<K, V>(this, flags);
    }

    @Override
    public void resolveStorage() {
        this.dataFormat.initialize(this.remoteCacheManager, this.name);
    }

    @Override
    public void resolveStorage(MediaType key, MediaType value) {
        this.resolveStorage();
        if (key != null && key != MediaType.APPLICATION_UNKNOWN && !this.dataFormat.getKeyType().match(key)) {
            DataFormat.Builder server = DataFormat.builder().from(this.dataFormat).keyType(key).valueType(value);
            this.dataFormat = DataFormat.builder().from(this.dataFormat).serverDataFormat(server).build();
            this.resolveStorage();
            if (this.remoteCacheManager.getMarshallerRegistry().getMarshaller(key) == null) {
                log.serverKeyTypeNotRecognized(key);
            }
        }
    }

    @Override
    public DataFormat getDataFormat() {
        return this.dataFormat;
    }

    @Override
    public boolean isTransactional() {
        return false;
    }

    @Override
    public boolean hasForceReturnFlag() {
        return EnumUtil.containsAny((long)this.flagInt, (long)Flag.FORCE_RETURN_VALUE.getFlagInt());
    }

    @Override
    public CompletionStage<Void> updateBloomFilter() {
        return CompletableFuture.completedFuture(null);
    }

    public String toString() {
        return "RemoteCache " + this.name;
    }

    @Override
    public CompletionStage<CacheConfiguration> configuration() {
        throw new UnsupportedOperationException();
    }

    @Override
    public CompletionStage<V> get(K key, CacheOptions options) {
        this.assertRemoteCacheManagerIsStarted();
        AdvancedHotRodOperation op = new AdvancedHotRodOperation(this.operationsFactory.newGetOperation(key), options);
        return this.dispatcher.execute(op).toCompletableFuture();
    }

    @Override
    public CompletionStage<CacheEntry<K, V>> getEntry(K key, CacheOptions options) {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation op = this.operationsFactory.newGetWithMetadataOperation(key, null);
        return this.dispatcher.execute(new AdvancedHotRodOperation(op, options)).thenApply(GetWithMetadataOperation.GetWithMetadataResult::value).thenApply(CacheEntryConversion.createCacheEntry(key));
    }

    @Override
    public CompletionStage<CacheEntry<K, V>> putIfAbsent(K key, V value, CacheWriteOptions options) {
        this.assertRemoteCacheManagerIsStarted();
        long lifespan = CacheOptionsUtil.lifespan(options, this.defaultLifespan, TimeUnit.MILLISECONDS);
        long maxIdle = CacheOptionsUtil.maxIdle(options, this.defaultMaxIdleTime, TimeUnit.MILLISECONDS);
        HotRodOperation<MetadataValue<V>> op = this.operationsFactory.newPutIfAbsentOperation(key, value, lifespan, TimeUnit.MILLISECONDS, maxIdle, TimeUnit.MILLISECONDS);
        return this.dispatcher.execute(new AdvancedHotRodOperation<MetadataValue<V>>(op, (CacheOptions)options, PrivateHotRodFlag.FORCE_RETURN_VALUE.getFlagInt())).thenApply(CacheEntryConversion.createCacheEntry(key));
    }

    @Override
    public CompletionStage<Boolean> setIfAbsent(K key, V value, CacheWriteOptions options) {
        return this.putIfAbsent(key, value, options).thenApply(e -> e == null || e.value() == null);
    }

    @Override
    public CompletionStage<CacheEntry<K, V>> put(K key, V value, CacheWriteOptions options) {
        this.assertRemoteCacheManagerIsStarted();
        long lifespan = CacheOptionsUtil.lifespan(options, this.defaultLifespan, TimeUnit.MILLISECONDS);
        long maxIdle = CacheOptionsUtil.maxIdle(options, this.defaultMaxIdleTime, TimeUnit.MILLISECONDS);
        HotRodOperation<MetadataValue<V>> op = this.operationsFactory.newPutKeyValueOperation(key, value, lifespan, TimeUnit.MILLISECONDS, maxIdle, TimeUnit.MILLISECONDS);
        return this.dispatcher.execute(new AdvancedHotRodOperation<MetadataValue<V>>(op, (CacheOptions)options, PrivateHotRodFlag.FORCE_RETURN_VALUE.getFlagInt())).thenApply(CacheEntryConversion.createCacheEntry(key));
    }

    @Override
    public CompletionStage<Void> set(K key, V value, CacheWriteOptions options) {
        this.assertRemoteCacheManagerIsStarted();
        long lifespan = CacheOptionsUtil.lifespan(options, this.defaultLifespan, TimeUnit.MILLISECONDS);
        long maxIdle = CacheOptionsUtil.maxIdle(options, this.defaultMaxIdleTime, TimeUnit.MILLISECONDS);
        HotRodOperation<MetadataValue<V>> op = this.operationsFactory.newPutKeyValueOperation(key, value, lifespan, TimeUnit.MILLISECONDS, maxIdle, TimeUnit.MILLISECONDS);
        return this.dispatcher.execute(new AdvancedHotRodOperation<MetadataValue<V>>(op, (CacheOptions)options)).thenApply(CompletableFutures.toNullFunction());
    }

    @Override
    public CompletionStage<Boolean> replace(K key, V value, CacheEntryVersion version, CacheWriteOptions options) {
        this.assertRemoteCacheManagerIsStarted();
        CacheEntryVersion cacheEntryVersion = Objects.requireNonNull(version);
        if (!(cacheEntryVersion instanceof CacheEntryVersionImpl)) {
            throw new IllegalArgumentException("Only CacheEntryVersionImpl instances are supported!");
        }
        CacheEntryVersionImpl cevi = (CacheEntryVersionImpl)cacheEntryVersion;
        long lifespan = CacheOptionsUtil.lifespan(options, this.defaultLifespan, TimeUnit.MILLISECONDS);
        long maxIdle = CacheOptionsUtil.maxIdle(options, this.defaultMaxIdleTime, TimeUnit.MILLISECONDS);
        HotRodOperation<VersionedOperationResponse<V>> op = this.operationsFactory.newReplaceIfUnmodifiedOperation(key, value, lifespan, TimeUnit.MILLISECONDS, maxIdle, TimeUnit.MILLISECONDS, cevi.version());
        return this.dispatcher.execute(new AdvancedHotRodOperation<VersionedOperationResponse<V>>(op, (CacheOptions)options)).thenApply(r -> r.getCode().isUpdated());
    }

    @Override
    public CompletionStage<CacheEntry<K, V>> getOrReplaceEntry(K key, V value, CacheEntryVersion version, CacheWriteOptions options) {
        this.assertRemoteCacheManagerIsStarted();
        CacheEntryVersion cacheEntryVersion = Objects.requireNonNull(version);
        if (!(cacheEntryVersion instanceof CacheEntryVersionImpl)) {
            throw new IllegalArgumentException("Only CacheEntryVersionImpl instances are supported!");
        }
        CacheEntryVersionImpl cevi = (CacheEntryVersionImpl)cacheEntryVersion;
        long lifespan = CacheOptionsUtil.lifespan(options, this.defaultLifespan, TimeUnit.MILLISECONDS);
        long maxIdle = CacheOptionsUtil.maxIdle(options, this.defaultMaxIdleTime, TimeUnit.MILLISECONDS);
        HotRodOperation<VersionedOperationResponse<V>> op = this.operationsFactory.newReplaceIfUnmodifiedOperation(key, value, lifespan, TimeUnit.MILLISECONDS, maxIdle, TimeUnit.MILLISECONDS, cevi.version());
        return this.dispatcher.execute(new AdvancedHotRodOperation<VersionedOperationResponse<V>>(op, (CacheOptions)options)).thenApply(r -> {
            if (r.getCode().isUpdated()) {
                return null;
            }
            return new CacheEntryImpl(key, r.getValue(), new CacheEntryMetadataImpl(r.getMetadata()));
        });
    }

    @Override
    public CompletionStage<Boolean> remove(K key, CacheOptions options) {
        return this.getAndRemove(key, options).thenApply(e -> e != null && e.value() != null);
    }

    @Override
    public CompletionStage<Boolean> remove(K key, CacheEntryVersion version, CacheOptions options) {
        this.assertRemoteCacheManagerIsStarted();
        CacheEntryVersion cacheEntryVersion = Objects.requireNonNull(version);
        if (!(cacheEntryVersion instanceof CacheEntryVersionImpl)) {
            throw new IllegalArgumentException("Only CacheEntryVersionImpl instances are supported!");
        }
        CacheEntryVersionImpl cevi = (CacheEntryVersionImpl)cacheEntryVersion;
        HotRodOperation op = this.operationsFactory.newRemoveIfUnmodifiedOperation(key, cevi.version());
        return this.dispatcher.execute(new AdvancedHotRodOperation(op, options)).thenApply(res -> res != null && res.getValue() != null);
    }

    @Override
    public CompletionStage<CacheEntry<K, V>> getAndRemove(K key, CacheOptions options) {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation op = this.operationsFactory.newRemoveOperation(key);
        return this.dispatcher.execute(new AdvancedHotRodOperation(op, options, PrivateHotRodFlag.FORCE_RETURN_VALUE.getFlagInt())).thenApply(CacheEntryConversion.createCacheEntry(key));
    }

    @Override
    public Flow.Publisher<K> keys(CacheOptions options) {
        throw new UnsupportedOperationException();
    }

    @Override
    public Flow.Publisher<CacheEntry<K, V>> entries(CacheOptions options) {
        throw new UnsupportedOperationException();
    }

    @Override
    public CompletionStage<Void> putAll(Map<K, V> entries, CacheWriteOptions options) {
        this.assertRemoteCacheManagerIsStarted();
        long lifespan = CacheOptionsUtil.lifespan(options, this.defaultLifespan, TimeUnit.MILLISECONDS);
        long maxIdle = CacheOptionsUtil.maxIdle(options, this.defaultMaxIdleTime, TimeUnit.MILLISECONDS);
        if (log.isTraceEnabled()) {
            log.tracef("About to putAll entries (%s) lifespan:%d, maxIdle:%d", entries, lifespan, maxIdle);
        }
        PutAllBulkOperation op = new PutAllBulkOperation(entries, this.dataFormat, serialized -> this.operationsFactory.newPutAllBytesOperation((Map<byte[], byte[]>)serialized, lifespan, TimeUnit.MILLISECONDS, maxIdle, TimeUnit.MILLISECONDS));
        return this.dispatcher.executeBulk(this.name, op);
    }

    @Override
    public CompletionStage<Void> putAll(Flow.Publisher<CacheEntry<K, V>> entries, CacheWriteOptions options) {
        return Flowable.fromPublisher((Publisher)FlowAdapters.toPublisher(entries)).collect(Collectors.toMap(CacheEntry::key, CacheEntry::value)).concatMapCompletable(map -> Completable.fromCompletionStage(this.putAll((Map<K, V>)map, options))).toCompletionStage(null);
    }

    @Override
    public Flow.Publisher<CacheEntry<K, V>> getAll(Set<K> keys, CacheOptions options) {
        this.assertRemoteCacheManagerIsStarted();
        if (log.isTraceEnabled()) {
            log.tracef("About to getAll entries (%s)", keys);
        }
        GetAllBulkOperation op = new GetAllBulkOperation(keys, this.dataFormat, serialized -> new AdvancedHotRodOperation(this.operationsFactory.newGetAllBytesOperation((Set<byte[]>)serialized), options));
        CompletionStage<Map> cs = this.dispatcher.executeBulk(this.name, op).thenApply(Collections::unmodifiableMap);
        Flowable flowable = Flowable.defer(() -> Flowable.fromCompletionStage((CompletionStage)cs)).concatMapIterable(Map::entrySet).map(e -> new CacheEntryImpl(e.getKey(), e.getValue(), null));
        return FlowAdapters.toFlowPublisher((Publisher)flowable);
    }

    @Override
    public Flow.Publisher<CacheEntry<K, V>> getAll(CacheOptions options, K[] keys) {
        return this.getAll(Set.of(keys), options);
    }

    @Override
    public Flow.Publisher<K> removeAll(Set<K> keys, CacheWriteOptions options) {
        return this.removeAll(Flowable.fromIterable(keys), options);
    }

    @Override
    public Flow.Publisher<K> removeAll(Flow.Publisher<K> keys, CacheWriteOptions options) {
        return this.removeAll(Flowable.fromPublisher((Publisher)FlowAdapters.toPublisher(keys)), options);
    }

    @Override
    private Flow.Publisher<K> removeAll(Flowable<K> keys, CacheWriteOptions options) {
        this.assertRemoteCacheManagerIsStarted();
        Flowable flowable = keys.concatMapMaybe(k -> Single.fromCompletionStage(this.remove(k, (CacheOptions)options)).mapOptional(removed -> removed != false ? Optional.of(k) : Optional.empty()));
        return FlowAdapters.toFlowPublisher((Publisher)flowable);
    }

    @Override
    public Flow.Publisher<CacheEntry<K, V>> getAndRemoveAll(Set<K> keys, CacheWriteOptions options) {
        return this.getAndRemoveAll(Flowable.fromIterable(keys), options);
    }

    @Override
    public Flow.Publisher<CacheEntry<K, V>> getAndRemoveAll(Flow.Publisher<K> keys, CacheWriteOptions options) {
        return this.getAndRemoveAll(Flowable.fromPublisher((Publisher)FlowAdapters.toPublisher(keys)), options);
    }

    @Override
    private Flow.Publisher<CacheEntry<K, V>> getAndRemoveAll(Flowable<K> keys, CacheWriteOptions options) {
        this.assertRemoteCacheManagerIsStarted();
        Flowable flowable = keys.concatMapMaybe(k -> Maybe.fromCompletionStage(this.getAndRemove(k, (CacheOptions)options)));
        return FlowAdapters.toFlowPublisher((Publisher)flowable);
    }

    @Override
    public CompletionStage<Long> estimateSize(CacheOptions options) {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation<Integer> op = this.operationsFactory.newSizeOperation();
        return this.dispatcher.execute(new AdvancedHotRodOperation<Integer>(op, options)).thenApply(Integer::longValue);
    }

    @Override
    public CompletionStage<Void> clear(CacheOptions options) {
        this.assertRemoteCacheManagerIsStarted();
        HotRodOperation<Void> op = this.operationsFactory.newClearOperation();
        return this.dispatcher.execute(new AdvancedHotRodOperation<Void>(op, options));
    }

    @Override
    public Flow.Publisher<CacheEntryEvent<K, V>> listen(CacheListenerOptions options, CacheEntryEventType[] types) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <T> Flow.Publisher<CacheEntryProcessorResult<K, T>> process(Set<K> keys, AsyncCacheEntryProcessor<K, V, T> task, CacheOptions options) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <T> Flow.Publisher<CacheEntryProcessorResult<K, T>> processAll(AsyncCacheEntryProcessor<K, V, T> processor, CacheProcessorOptions options) {
        throw new UnsupportedOperationException();
    }
}

