/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.impl;

import io.netty.buffer.ByteBuf;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import org.infinispan.client.hotrod.StreamingRemoteCache;
import org.infinispan.client.hotrod.impl.ClientStatistics;
import org.infinispan.client.hotrod.impl.InternalRemoteCache;
import org.infinispan.client.hotrod.impl.operations.CacheOperationsFactory;
import org.infinispan.client.hotrod.impl.operations.GetStreamEndOperation;
import org.infinispan.client.hotrod.impl.operations.GetStreamNextResponse;
import org.infinispan.client.hotrod.impl.operations.GetStreamStartResponse;
import org.infinispan.client.hotrod.impl.operations.HotRodOperation;
import org.infinispan.client.hotrod.impl.operations.PutStreamResponse;
import org.infinispan.client.hotrod.impl.protocol.GetInputStream;
import org.infinispan.client.hotrod.impl.protocol.PutOutputStream;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelRecord;
import org.infinispan.client.hotrod.impl.transport.netty.OperationDispatcher;
import org.infinispan.commons.util.concurrent.CompletableFutures;

public class StreamingRemoteCacheImpl<K>
implements StreamingRemoteCache<K> {
    private final CacheOperationsFactory factory;
    private final OperationDispatcher dispatcher;
    private final ClientStatistics clientStatistics;

    public StreamingRemoteCacheImpl(InternalRemoteCache<K, ?> cache) {
        this.factory = cache.getOperationsFactory();
        this.dispatcher = cache.getDispatcher();
        this.clientStatistics = cache.getRemoteCacheContainer().getConfiguration().statistics().enabled() ? cache.clientStatistics() : null;
    }

    @Override
    public <T extends InputStream> T get(K key) {
        long startTime = this.clientStatistics != null ? this.clientStatistics.time() : 0L;
        HotRodOperation<GetStreamStartResponse> hro = this.factory.newGetStreamStartOperation(key, 8192);
        GetStreamStartResponse gsr = this.dispatcher.await(this.dispatcher.execute(hro));
        if (gsr == null) {
            if (this.clientStatistics != null) {
                this.clientStatistics.dataRead(false, startTime, 1);
            }
            return null;
        }
        if (gsr.complete() && this.clientStatistics != null) {
            this.clientStatistics.dataRead(true, startTime, 1);
        }
        return (T)new GetInputStream(() -> {
            HotRodOperation<GetStreamNextResponse> gsno = this.factory.newGetStreamNextOperation(gsr.id(), gsr.channel());
            CompletionStage<GetStreamNextResponse> stage = this.dispatcher.executeOnSingleAddress(gsno, ChannelRecord.of(gsr.channel()));
            if (this.clientStatistics != null) {
                stage.thenApply(gsnr -> {
                    if (gsnr.complete()) {
                        this.clientStatistics.dataRead(true, startTime, 1);
                    }
                    return gsnr;
                });
            }
            return stage;
        }, gsr.metadata(), gsr.value(), gsr.complete(), () -> {
            GetStreamEndOperation gseo = this.factory.newGetStreamEndOperation(gsr.id());
            this.dispatcher.executeOnSingleAddress(gseo, ChannelRecord.of(gsr.channel()));
        });
    }

    @Override
    public OutputStream put(K key) {
        return this.put(key, -1L, TimeUnit.SECONDS, -1L, TimeUnit.SECONDS);
    }

    @Override
    public OutputStream put(K key, long lifespan, TimeUnit unit) {
        return this.put(key, lifespan, unit, -1L, TimeUnit.SECONDS);
    }

    private OutputStream handlePutStreamOp(HotRodOperation<PutStreamResponse> hro) {
        long startTime = this.clientStatistics != null ? this.clientStatistics.time() : 0L;
        PutStreamResponse psr = this.dispatcher.await(this.dispatcher.execute(hro));
        return new PutOutputStream((bb, complete) -> {
            HotRodOperation<Boolean> psno = this.factory.newPutStreamNextOperation(psr.id(), (boolean)complete, (ByteBuf)bb, psr.channel());
            CompletionStage<Boolean> psnr = this.dispatcher.executeOnSingleAddress(psno, ChannelRecord.of(psr.channel()));
            if (this.clientStatistics != null && complete.booleanValue()) {
                return psnr.thenAccept(___ -> this.clientStatistics.dataStore(startTime, 1));
            }
            return psnr.thenApply(CompletableFutures.toNullFunction());
        }, psr.channel().alloc(), this.dispatcher);
    }

    @Override
    public OutputStream put(K key, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
        HotRodOperation<PutStreamResponse> hro = this.factory.newPutStreamStartOperation(key, 0L, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
        return this.handlePutStreamOp(hro);
    }

    @Override
    public OutputStream putIfAbsent(K key) {
        return this.putIfAbsent(key, -1L, TimeUnit.SECONDS, -1L, TimeUnit.SECONDS);
    }

    @Override
    public OutputStream putIfAbsent(K key, long lifespan, TimeUnit unit) {
        return this.putIfAbsent(key, lifespan, unit, -1L, TimeUnit.SECONDS);
    }

    @Override
    public OutputStream putIfAbsent(K key, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
        HotRodOperation<PutStreamResponse> hro = this.factory.newPutStreamStartOperation(key, -1L, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
        return this.handlePutStreamOp(hro);
    }

    @Override
    public OutputStream replaceWithVersion(K key, long version) {
        return this.replaceWithVersion(key, version, -1L, TimeUnit.SECONDS, -1L, TimeUnit.SECONDS);
    }

    @Override
    public OutputStream replaceWithVersion(K key, long version, long lifespan, TimeUnit unit) {
        return this.replaceWithVersion(key, version, lifespan, unit, -1L, TimeUnit.SECONDS);
    }

    @Override
    public OutputStream replaceWithVersion(K key, long version, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
        HotRodOperation<PutStreamResponse> hro = this.factory.newPutStreamStartOperation(key, version, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
        return this.handlePutStreamOp(hro);
    }
}

