/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.impl;

import io.reactivex.rxjava3.core.Flowable;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.infinispan.client.hotrod.CacheTopologyInfo;
import org.infinispan.client.hotrod.DataFormat;
import org.infinispan.client.hotrod.Flag;
import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.ProtocolVersion;
import org.infinispan.client.hotrod.RemoteCacheContainer;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.ServerStatistics;
import org.infinispan.client.hotrod.StreamingRemoteCache;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.StatisticsConfiguration;
import org.infinispan.client.hotrod.exceptions.RemoteCacheManagerNotStartedException;
import org.infinispan.client.hotrod.filter.Filters;
import org.infinispan.client.hotrod.impl.ClientStatistics;
import org.infinispan.client.hotrod.impl.InternalRemoteCache;
import org.infinispan.client.hotrod.impl.RemoteCacheEntrySet;
import org.infinispan.client.hotrod.impl.RemoteCacheKeySet;
import org.infinispan.client.hotrod.impl.RemoteCacheSupport;
import org.infinispan.client.hotrod.impl.RemoteCacheValuesCollection;
import org.infinispan.client.hotrod.impl.StreamingRemoteCacheImpl;
import org.infinispan.client.hotrod.impl.Util;
import org.infinispan.client.hotrod.impl.iteration.RemotePublisher;
import org.infinispan.client.hotrod.impl.operations.AddClientListenerOperation;
import org.infinispan.client.hotrod.impl.operations.ClearOperation;
import org.infinispan.client.hotrod.impl.operations.ContainsKeyOperation;
import org.infinispan.client.hotrod.impl.operations.ExecuteOperation;
import org.infinispan.client.hotrod.impl.operations.GetAllParallelOperation;
import org.infinispan.client.hotrod.impl.operations.GetOperation;
import org.infinispan.client.hotrod.impl.operations.GetWithMetadataOperation;
import org.infinispan.client.hotrod.impl.operations.OperationsFactory;
import org.infinispan.client.hotrod.impl.operations.PingResponse;
import org.infinispan.client.hotrod.impl.operations.PutAllParallelOperation;
import org.infinispan.client.hotrod.impl.operations.PutIfAbsentOperation;
import org.infinispan.client.hotrod.impl.operations.PutOperation;
import org.infinispan.client.hotrod.impl.operations.RemoveClientListenerOperation;
import org.infinispan.client.hotrod.impl.operations.RemoveIfUnmodifiedOperation;
import org.infinispan.client.hotrod.impl.operations.RemoveOperation;
import org.infinispan.client.hotrod.impl.operations.ReplaceIfUnmodifiedOperation;
import org.infinispan.client.hotrod.impl.operations.ReplaceOperation;
import org.infinispan.client.hotrod.impl.operations.RetryAwareCompletionStage;
import org.infinispan.client.hotrod.impl.operations.SizeOperation;
import org.infinispan.client.hotrod.impl.operations.StatsOperation;
import org.infinispan.client.hotrod.impl.query.RemoteQueryFactory;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.client.hotrod.metrics.HotRodClientMetricsRegistry;
import org.infinispan.client.hotrod.near.NearCacheService;
import org.infinispan.commons.api.query.ContinuousQuery;
import org.infinispan.commons.api.query.Query;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.internal.InternalCacheNames;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.CloseableIteratorCollection;
import org.infinispan.commons.util.CloseableIteratorSet;
import org.infinispan.commons.util.Closeables;
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.reactivestreams.Publisher;

public class RemoteCacheImpl<K, V>
extends RemoteCacheSupport<K, V>
implements InternalRemoteCache<K, V> {
    private static final Log log = LogFactory.getLog(RemoteCacheImpl.class, Log.class);
    private final String name;
    private final RemoteCacheManager remoteCacheManager;
    protected OperationsFactory operationsFactory;
    private int batchSize;
    private volatile boolean isObjectStorage;
    private DataFormat dataFormat;
    protected ClientStatistics clientStatistics;
    private ObjectName mbeanObjectName;
    private RemoteQueryFactory queryFactory;

    public RemoteCacheImpl(RemoteCacheManager rcm, String name, TimeService timeService) {
        this(rcm, name, timeService, null);
    }

    public RemoteCacheImpl(RemoteCacheManager rcm, String name, TimeService timeService, NearCacheService<K, V> nearCacheService) {
        if (log.isTraceEnabled()) {
            log.tracef("Creating remote cache: %s", name);
        }
        this.name = name;
        this.remoteCacheManager = rcm;
        this.dataFormat = DataFormat.builder().build();
        boolean statsEnabled = rcm.getConfiguration().statistics().enabled();
        HotRodClientMetricsRegistry metrics = statsEnabled ? rcm.getConfiguration().metricRegistry().withCache(name) : HotRodClientMetricsRegistry.DISABLED;
        this.clientStatistics = new ClientStatistics(statsEnabled, timeService, nearCacheService, metrics);
    }

    protected RemoteCacheImpl(RemoteCacheManager rcm, String name, ClientStatistics clientStatistics) {
        if (log.isTraceEnabled()) {
            log.tracef("Creating remote cache: %s", name);
        }
        this.name = name;
        this.remoteCacheManager = rcm;
        this.dataFormat = DataFormat.builder().build();
        this.clientStatistics = clientStatistics;
    }

    @Override
    public void init(OperationsFactory operationsFactory, Configuration configuration, ObjectName jmxParent) {
        this.init(operationsFactory, configuration);
        this.registerMBean(jmxParent);
    }

    @Override
    public void init(OperationsFactory operationsFactory, Configuration configuration) {
        this.init(operationsFactory, configuration.batchSize());
    }

    private void init(OperationsFactory operationsFactory, int batchSize) {
        this.operationsFactory = operationsFactory;
        this.batchSize = batchSize;
        try {
            this.queryFactory = new RemoteQueryFactory(this);
        }
        catch (Throwable e) {
            log.queryDisabled();
        }
    }

    private void registerMBean(ObjectName jmxParent) {
        StatisticsConfiguration configuration = this.getRemoteCacheContainer().getConfiguration().statistics();
        if (configuration.jmxEnabled()) {
            try {
                MBeanServer mbeanServer = configuration.mbeanServerLookup().getMBeanServer();
                String cacheName = this.name.isEmpty() ? "org.infinispan.default" : this.name;
                this.mbeanObjectName = new ObjectName(String.format("%s:type=HotRodClient,name=%s,cache=%s", jmxParent.getDomain(), configuration.jmxName(), cacheName));
                mbeanServer.registerMBean(this.clientStatistics, this.mbeanObjectName);
            }
            catch (Exception e) {
                throw Log.HOTROD.jmxRegistrationFailure(e);
            }
        }
    }

    private synchronized void unregisterMBean() {
        if (this.mbeanObjectName != null) {
            try {
                MBeanServer mBeanServer = this.getRemoteCacheContainer().getConfiguration().statistics().mbeanServerLookup().getMBeanServer();
                if (mBeanServer.isRegistered(this.mbeanObjectName)) {
                    mBeanServer.unregisterMBean(this.mbeanObjectName);
                } else {
                    Log.HOTROD.debugf("MBean not registered: %s", this.mbeanObjectName);
                }
                this.mbeanObjectName = null;
            }
            catch (Exception e) {
                throw Log.HOTROD.jmxUnregistrationFailure(e);
            }
        }
    }

    @Override
    public OperationsFactory getOperationsFactory() {
        return this.operationsFactory;
    }

    @Override
    public RemoteCacheContainer getRemoteCacheContainer() {
        return this.remoteCacheManager;
    }

    @Override
    public CompletableFuture<Boolean> removeWithVersionAsync(K key, long version) {
        this.assertRemoteCacheManagerIsStarted();
        RemoveIfUnmodifiedOperation op = this.operationsFactory.newRemoveIfUnmodifiedOperation(key, this.keyToBytes(key), version, this.dataFormat);
        return op.execute().thenApply(response -> response.getCode().isUpdated());
    }

    @Override
    public CompletableFuture<V> mergeAsync(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
        throw new UnsupportedOperationException();
    }

    @Override
    public CompletableFuture<Boolean> replaceWithVersionAsync(K key, V newValue, long version, long lifespan, TimeUnit lifespanTimeUnit, long maxIdle, TimeUnit maxIdleTimeUnit) {
        this.assertRemoteCacheManagerIsStarted();
        ReplaceIfUnmodifiedOperation op = this.operationsFactory.newReplaceIfUnmodifiedOperation(key, this.keyToBytes(key), this.valueToBytes(newValue), lifespan, lifespanTimeUnit, maxIdle, maxIdleTimeUnit, version, this.dataFormat);
        return op.execute().thenApply(response -> response.getCode().isUpdated());
    }

    @Override
    public CloseableIterator<Map.Entry<Object, Object>> retrieveEntries(String filterConverterFactory, Object[] filterConverterParams, Set<Integer> segments, int batchSize) {
        Publisher remotePublisher = this.publishEntries(filterConverterFactory, filterConverterParams, segments, batchSize);
        return Closeables.iterator(remotePublisher, (int)batchSize);
    }

    @Override
    public <E> Publisher<Map.Entry<K, E>> publishEntries(String filterConverterFactory, Object[] filterConverterParams, Set<Integer> segments, int batchSize) {
        this.assertRemoteCacheManagerIsStarted();
        if (segments != null && segments.isEmpty()) {
            return Flowable.empty();
        }
        byte[][] params = this.marshallParams(filterConverterParams);
        return new RemotePublisher(this.operationsFactory, filterConverterFactory, params, segments, batchSize, false, this.dataFormat);
    }

    @Override
    public CloseableIterator<Map.Entry<Object, Object>> retrieveEntriesByQuery(Query<?> filterQuery, Set<Integer> segments, int batchSize) {
        Publisher remotePublisher = this.publishEntriesByQuery(filterQuery, segments, batchSize);
        return Closeables.iterator(remotePublisher, (int)batchSize);
    }

    @Override
    public <E> Publisher<Map.Entry<K, E>> publishEntriesByQuery(Query<?> filterQuery, Set<Integer> segments, int batchSize) {
        Object[] factoryParams = Filters.makeFactoryParams(filterQuery);
        return this.publishEntries("iteration-filter-converter-factory", factoryParams, segments, batchSize);
    }

    @Override
    public CloseableIterator<Map.Entry<Object, MetadataValue<Object>>> retrieveEntriesWithMetadata(Set<Integer> segments, int batchSize) {
        Publisher<Map.Entry<K, MetadataValue<V>>> remotePublisher = this.publishEntriesWithMetadata(segments, batchSize);
        return Closeables.iterator(remotePublisher, (int)batchSize);
    }

    @Override
    public Publisher<Map.Entry<K, MetadataValue<V>>> publishEntriesWithMetadata(Set<Integer> segments, int batchSize) {
        return new RemotePublisher(this.operationsFactory, null, null, segments, batchSize, true, this.dataFormat);
    }

    @Override
    public CompletableFuture<MetadataValue<V>> getWithMetadataAsync(K key) {
        this.assertRemoteCacheManagerIsStarted();
        GetWithMetadataOperation op = this.operationsFactory.newGetWithMetadataOperation(key, this.keyToBytes(key), this.dataFormat);
        return op.execute();
    }

    @Override
    public RetryAwareCompletionStage<MetadataValue<V>> getWithMetadataAsync(K key, SocketAddress preferredAddres) {
        this.assertRemoteCacheManagerIsStarted();
        GetWithMetadataOperation op = this.operationsFactory.newGetWithMetadataOperation(key, this.keyToBytes(key), this.dataFormat, preferredAddres);
        return op.internalExecute();
    }

    @Override
    public CompletableFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
        this.assertRemoteCacheManagerIsStarted();
        if (log.isTraceEnabled()) {
            log.tracef("About to putAll entries (%s) lifespan:%d (%s), maxIdle:%d (%s)", new Object[]{map, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit});
        }
        HashMap<byte[], byte[]> byteMap = new HashMap<byte[], byte[]>();
        for (Map.Entry<K, V> entry : map.entrySet()) {
            byteMap.put(this.keyToBytes(entry.getKey()), this.valueToBytes(entry.getValue()));
        }
        PutAllParallelOperation op = this.operationsFactory.newPutAllOperation(byteMap, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit, this.dataFormat);
        return op.execute();
    }

    @Override
    public CompletableFuture<Long> sizeAsync() {
        this.assertRemoteCacheManagerIsStarted();
        SizeOperation op = this.operationsFactory.newSizeOperation();
        return op.execute().thenApply(Integer::longValue);
    }

    public boolean isEmpty() {
        return this.size() == 0;
    }

    @Override
    public ClientStatistics clientStatistics() {
        return this.clientStatistics;
    }

    @Override
    public ServerStatistics serverStatistics() {
        return Util.await(this.serverStatisticsAsync());
    }

    @Override
    public CompletionStage<ServerStatistics> serverStatisticsAsync() {
        this.assertRemoteCacheManagerIsStarted();
        StatsOperation op = this.operationsFactory.newStatsOperation();
        return op.execute();
    }

    @Override
    public CompletableFuture<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
        this.assertRemoteCacheManagerIsStarted();
        if (log.isTraceEnabled()) {
            log.tracef("About to add (K,V): (%s, %s) lifespan:%d, maxIdle:%d", new Object[]{key, value, lifespan, maxIdleTime});
        }
        PutOperation op = this.operationsFactory.newPutKeyValueOperation(key, this.keyToBytes(key), this.valueToBytes(value), lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit, this.dataFormat);
        return op.execute();
    }

    public CompletableFuture<Void> clearAsync() {
        this.assertRemoteCacheManagerIsStarted();
        ClearOperation op = this.operationsFactory.newClearOperation();
        return op.execute();
    }

    @Override
    public CompletableFuture<V> computeAsync(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
        CompletableFuture<MetadataValue<V>> cf = this.getWithMetadataAsync(key);
        return cf.thenCompose(metadataValue -> {
            long version;
            Object oldValue;
            if (metadataValue != null) {
                oldValue = metadataValue.getValue();
                version = metadataValue.getVersion();
            } else {
                oldValue = null;
                version = -1L;
            }
            Object newValue = remappingFunction.apply((K)key, (V)oldValue);
            CompletionStage<Boolean> doneStage = newValue != null ? (oldValue != null ? this.replaceWithVersionAsync(key, newValue, version, lifespan, lifespanUnit, maxIdle, maxIdleUnit) : this.putIfAbsentAsync(key, newValue, lifespan, lifespanUnit, maxIdle, maxIdleUnit, Flag.FORCE_RETURN_VALUE).thenApply(Objects::isNull)) : (oldValue != null ? this.removeWithVersionAsync(key, version) : CompletableFuture.completedFuture(Boolean.TRUE));
            return doneStage.thenCompose(done -> {
                if (done.booleanValue()) {
                    return CompletableFuture.completedFuture(newValue);
                }
                return this.computeAsync(key, remappingFunction, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
            });
        });
    }

    @Override
    public CompletableFuture<V> computeIfAbsentAsync(K key, Function<? super K, ? extends V> mappingFunction, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
        CompletableFuture<V> cf = this.getAsync((Object)key);
        return cf.thenCompose(oldValue -> {
            if (oldValue != null) {
                return CompletableFuture.completedFuture(oldValue);
            }
            Object newValue = mappingFunction.apply((K)key);
            if (newValue == null) {
                return CompletableFutures.completedNull();
            }
            return this.putIfAbsentAsync(key, newValue, lifespan, lifespanUnit, maxIdle, maxIdleUnit).thenApply(v -> v == null ? newValue : v);
        });
    }

    @Override
    public CompletableFuture<V> computeIfPresentAsync(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
        CompletableFuture<MetadataValue<V>> cf = this.getWithMetadataAsync(key);
        return cf.thenCompose(metadata -> {
            if (metadata == null || metadata.getValue() == null) {
                return CompletableFutures.completedNull();
            }
            Object newValue = remappingFunction.apply((K)key, (V)metadata.getValue());
            CompletableFuture<Boolean> done = newValue == null ? this.removeWithVersionAsync(key, metadata.getVersion()) : this.replaceWithVersionAsync(key, newValue, metadata.getVersion(), lifespan, lifespanUnit, maxIdle, maxIdleUnit);
            return done.thenCompose(success -> {
                if (success.booleanValue()) {
                    return CompletableFuture.completedFuture(newValue);
                }
                return this.computeIfPresentAsync(key, remappingFunction, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
            });
        });
    }

    @Override
    public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
        throw new UnsupportedOperationException();
    }

    @Override
    public CompletableFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
        return this.putIfAbsentAsync(key, value, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit, null);
    }

    private CompletableFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag ... flags) {
        this.assertRemoteCacheManagerIsStarted();
        PutIfAbsentOperation op = this.operationsFactory.newPutIfAbsentOperation(key, this.keyToBytes(key), this.valueToBytes(value), lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit, this.dataFormat);
        op.header().addFlags(flags);
        return op.execute();
    }

    @Override
    public CompletableFuture<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
        Objects.requireNonNull(oldValue);
        Objects.requireNonNull(newValue);
        CompletableFuture<MetadataValue<V>> stage = this.getWithMetadataAsync(key);
        return stage.thenCompose(metadataValue -> {
            Object prevValue;
            if (metadataValue != null && oldValue.equals(prevValue = metadataValue.getValue())) {
                return this.replaceWithVersionAsync(key, newValue, metadataValue.getVersion(), lifespan, lifespanUnit, maxIdle, maxIdleUnit).thenCompose(replaced -> {
                    if (replaced.booleanValue()) {
                        return CompletableFuture.completedFuture(replaced);
                    }
                    return this.replaceAsync(key, oldValue, newValue, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
                });
            }
            return CompletableFuture.completedFuture(Boolean.FALSE);
        }).toCompletableFuture();
    }

    @Override
    public CompletableFuture<V> removeAsync(Object key) {
        this.assertRemoteCacheManagerIsStarted();
        RemoveOperation removeOperation = this.operationsFactory.newRemoveOperation(key, this.keyToBytes(key), this.dataFormat);
        return removeOperation.execute();
    }

    @Override
    public CompletableFuture<Boolean> removeAsync(Object key, Object value) {
        Objects.requireNonNull(value);
        CompletableFuture<MetadataValue<V>> stage = this.getWithMetadataAsync(key);
        return stage.thenCompose(metadataValue -> {
            if (metadataValue == null || !value.equals(metadataValue.getValue())) {
                return CompletableFuture.completedFuture(Boolean.FALSE);
            }
            return this.removeWithVersionAsync(key, metadataValue.getVersion()).thenCompose(removed -> {
                if (removed.booleanValue()) {
                    return CompletableFuture.completedFuture(Boolean.TRUE);
                }
                return this.removeAsync(key, value);
            });
        }).toCompletableFuture();
    }

    @Override
    public CompletableFuture<V> replaceAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
        this.assertRemoteCacheManagerIsStarted();
        ReplaceOperation op = this.operationsFactory.newReplaceOperation(key, this.keyToBytes(key), this.valueToBytes(value), lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit, this.dataFormat);
        return op.execute();
    }

    @Override
    public CompletableFuture<Boolean> containsKeyAsync(K key) {
        this.assertRemoteCacheManagerIsStarted();
        ContainsKeyOperation op = this.operationsFactory.newContainsKeyOperation(key, this.keyToBytes(key), this.dataFormat);
        return op.execute();
    }

    public boolean containsValue(Object value) {
        Objects.requireNonNull(value);
        return this.values().contains(value);
    }

    @Override
    public CompletableFuture<Map<K, V>> getAllAsync(Set<?> keys) {
        this.assertRemoteCacheManagerIsStarted();
        if (log.isTraceEnabled()) {
            log.tracef("About to getAll entries (%s)", keys);
        }
        HashSet<byte[]> byteKeys = new HashSet<byte[]>(keys.size());
        for (Object key : keys) {
            byteKeys.add(this.keyToBytes(key));
        }
        GetAllParallelOperation op = this.operationsFactory.newGetAllOperation(byteKeys, this.dataFormat);
        return op.execute().thenApply(Collections::unmodifiableMap);
    }

    public void start() {
        if (log.isDebugEnabled()) {
            log.debugf("Start called, nothing to do here(%s)", this.getName());
        }
    }

    public void stop() {
        this.unregisterMBean();
        this.remoteCacheManager.getConfiguration().metricRegistry().removeCache(this.name);
    }

    public String getName() {
        return this.name;
    }

    public String getVersion() {
        return RemoteCacheImpl.class.getPackage().getImplementationVersion();
    }

    public <T> Query<T> query(String query) {
        if (this.queryFactory == null) {
            throw log.queryNotSupported();
        }
        return this.queryFactory.create(query);
    }

    public ContinuousQuery<K, V> continuousQuery() {
        if (this.queryFactory == null) {
            throw log.queryNotSupported();
        }
        return this.queryFactory.continuousQuery(this);
    }

    @Override
    public String getProtocolVersion() {
        return "HotRod client, protocol version: " + String.valueOf((Object)ProtocolVersion.DEFAULT_PROTOCOL_VERSION);
    }

    @Override
    public void addClientListener(Object listener) {
        this.assertRemoteCacheManagerIsStarted();
        AddClientListenerOperation op = this.operationsFactory.newAddClientListenerOperation(listener, this.dataFormat);
        Util.await(op.execute());
    }

    @Override
    public void addClientListener(Object listener, Object[] filterFactoryParams, Object[] converterFactoryParams) {
        this.assertRemoteCacheManagerIsStarted();
        byte[][] marshalledFilterParams = this.marshallParams(filterFactoryParams);
        byte[][] marshalledConverterParams = this.marshallParams(converterFactoryParams);
        AddClientListenerOperation op = this.operationsFactory.newAddClientListenerOperation(listener, marshalledFilterParams, marshalledConverterParams, this.dataFormat);
        Util.await(op.execute());
    }

    @Override
    public SocketAddress addNearCacheListener(Object listener, int bloomBits) {
        throw new UnsupportedOperationException("Adding a near cache listener to a RemoteCache is not supported!");
    }

    private byte[][] marshallParams(Object[] params) {
        if (params == null) {
            return org.infinispan.commons.util.Util.EMPTY_BYTE_ARRAY_ARRAY;
        }
        byte[][] marshalledParams = new byte[params.length][];
        for (int i = 0; i < marshalledParams.length; ++i) {
            byte[] bytes = this.keyToBytes(params[i]);
            marshalledParams[i] = bytes;
        }
        return marshalledParams;
    }

    @Override
    public void removeClientListener(Object listener) {
        this.assertRemoteCacheManagerIsStarted();
        RemoveClientListenerOperation op = this.operationsFactory.newRemoveClientListenerOperation(listener);
        Util.await(op.execute());
    }

    @Override
    public InternalRemoteCache<K, V> withFlags(Flag ... flags) {
        if (flags.length == 0) {
            return this;
        }
        int existingFlags = this.operationsFactory.flags();
        int newFlags = 0;
        for (Flag flag : flags) {
            newFlags |= flag.getFlagInt();
        }
        int resultingFlags = (int)EnumUtil.mergeBitSets((long)existingFlags, (long)newFlags);
        if (resultingFlags == existingFlags) {
            return this;
        }
        RemoteCacheImpl instance = this.newInstance(resultingFlags);
        instance.dataFormat = this.dataFormat;
        return instance;
    }

    @Override
    public InternalRemoteCache<K, V> noFlags() {
        if (this.operationsFactory.flags() == 0) {
            return this;
        }
        RemoteCacheImpl instance = this.newInstance(0);
        instance.dataFormat = this.dataFormat;
        return instance;
    }

    @Override
    public CompletableFuture<V> getAsync(Object key) {
        this.assertRemoteCacheManagerIsStarted();
        byte[] keyBytes = this.keyToBytes(key);
        GetOperation gco = this.operationsFactory.newGetKeyOperation(key, keyBytes, this.dataFormat);
        CompletableFuture result = gco.execute();
        if (log.isTraceEnabled()) {
            result.thenAccept(value -> log.tracef("For key(%s) returning %s", key, value));
        }
        return result;
    }

    @Override
    public CompletionStage<PingResponse> ping() {
        return this.operationsFactory.newFaultTolerantPingOperation().execute();
    }

    @Override
    public byte[] keyToBytes(Object o) {
        return this.dataFormat.keyToBytes(o);
    }

    protected byte[] valueToBytes(Object o) {
        return this.dataFormat.valueToBytes(o);
    }

    protected void assertRemoteCacheManagerIsStarted() {
        if (!this.remoteCacheManager.isStarted()) {
            String message = "Cannot perform operations on a cache associated with an unstarted RemoteCacheManager. Use RemoteCacheManager.start before using the remote cache.";
            Log.HOTROD.unstartedRemoteCacheManager();
            throw new RemoteCacheManagerNotStartedException(message);
        }
    }

    @Override
    public CloseableIteratorSet<K> keySet(IntSet segments) {
        return new RemoteCacheKeySet(this, segments);
    }

    @Override
    public CloseableIterator<K> keyIterator(IntSet segments) {
        return this.operationsFactory.getCodec().keyIterator(this, this.operationsFactory, segments, this.batchSize);
    }

    @Override
    public CloseableIteratorSet<Map.Entry<K, V>> entrySet(IntSet segments) {
        return new RemoteCacheEntrySet(this, segments);
    }

    @Override
    public CloseableIterator<Map.Entry<K, V>> entryIterator(IntSet segments) {
        return this.operationsFactory.getCodec().entryIterator(this, segments, this.batchSize);
    }

    @Override
    public CloseableIteratorCollection<V> values(IntSet segments) {
        return new RemoteCacheValuesCollection(this, segments);
    }

    @Override
    public <T> T execute(String taskName, Map<String, ?> params) {
        return this.execute(taskName, params, null);
    }

    @Override
    public <T> T execute(String taskName, Map<String, ?> params, Object key) {
        this.assertRemoteCacheManagerIsStarted();
        HashMap<String, byte[]> marshalledParams = new HashMap<String, byte[]>();
        if (params != null) {
            for (Map.Entry<String, ?> entry : params.entrySet()) {
                marshalledParams.put(entry.getKey(), this.keyToBytes(entry.getValue()));
            }
        }
        Object keyHint = null;
        if (key != null) {
            keyHint = this.isObjectStorage ? key : (Object)this.keyToBytes(key);
        }
        ExecuteOperation op = this.operationsFactory.newExecuteOperation(taskName, marshalledParams, keyHint, this.dataFormat);
        return Util.await(op.execute());
    }

    @Override
    public CacheTopologyInfo getCacheTopologyInfo() {
        return this.operationsFactory.getCacheTopologyInfo();
    }

    @Override
    public StreamingRemoteCache<K> streaming() {
        this.assertRemoteCacheManagerIsStarted();
        return new StreamingRemoteCacheImpl(this);
    }

    @Override
    public <T, U> InternalRemoteCache<T, U> withDataFormat(DataFormat newDataFormat) {
        Objects.requireNonNull(newDataFormat, "Data Format must not be null").initialize(this.remoteCacheManager, this.name, this.isObjectStorage);
        RemoteCacheImpl<T, U> instance = this.newInstance();
        instance.dataFormat = newDataFormat;
        return instance;
    }

    private <T, U> RemoteCacheImpl<T, U> newInstance() {
        RemoteCacheImpl<K, V> copy = new RemoteCacheImpl<K, V>(this.remoteCacheManager, this.name, this.clientStatistics);
        copy.init(this.operationsFactory, this.batchSize);
        return copy;
    }

    private <T, U> RemoteCacheImpl<T, U> newInstance(int flags) {
        RemoteCacheImpl<K, V> copy = new RemoteCacheImpl<K, V>(this.remoteCacheManager, this.name, this.clientStatistics);
        OperationsFactory newOperationsFactory = this.remoteCacheManager.createOperationFactory(this.name, this.remoteCacheManager.getConfiguration().forceReturnValues(), this.clientStatistics, flags);
        copy.init(newOperationsFactory, this.batchSize);
        return copy;
    }

    @Override
    public void resolveStorage(boolean objectStorage) {
        this.isObjectStorage = objectStorage;
        this.dataFormat.initialize(this.remoteCacheManager, this.name, this.isObjectStorage);
    }

    @Override
    public void resolveStorage(MediaType key, MediaType value, boolean objectStorage) {
        this.resolveStorage(objectStorage);
        if (key != null && key != MediaType.APPLICATION_UNKNOWN && !this.dataFormat.getKeyType().match(key)) {
            DataFormat.Builder server = DataFormat.builder().from(this.dataFormat).keyType(key).valueType(value);
            this.dataFormat = DataFormat.builder().from(this.dataFormat).serverDataFormat(server).build();
            this.resolveStorage(objectStorage);
            if (!InternalCacheNames.GLOBAL_STATE_INTERNAL_CACHES.contains(this.name) && this.remoteCacheManager.getMarshallerRegistry().getMarshaller(key) == null) {
                log.serverKeyTypeNotRecognized(key);
            }
        }
    }

    @Override
    public DataFormat getDataFormat() {
        return this.dataFormat;
    }

    @Override
    public boolean isTransactional() {
        return false;
    }

    @Override
    public boolean isObjectStorage() {
        return this.isObjectStorage;
    }

    @Override
    public boolean hasForceReturnFlag() {
        return this.operationsFactory.hasFlag(Flag.FORCE_RETURN_VALUE);
    }

    @Override
    public CompletionStage<Void> updateBloomFilter() {
        return CompletableFuture.completedFuture(null);
    }

    public String toString() {
        return "RemoteCache " + this.name;
    }
}

