/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.evcache.pool;

import com.netflix.config.DynamicBooleanProperty;
import com.netflix.config.DynamicIntProperty;
import com.netflix.evcache.EVCacheLatch;
import com.netflix.evcache.metrics.EVCacheMetricsFactory;
import com.netflix.evcache.operation.EVCacheLatchImpl;
import com.netflix.evcache.operation.EVCacheOperationFuture;
import com.netflix.evcache.pool.ChunkTranscoder;
import com.netflix.evcache.pool.EVCacheClient;
import com.netflix.evcache.pool.EVCacheClientPool;
import com.netflix.evcache.util.EVCacheConfig;
import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import net.spy.memcached.CachedData;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.ops.StatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EVCacheClientUtil {
    private static Logger log = LoggerFactory.getLogger(EVCacheClientUtil.class);
    private final ChunkTranscoder ct = new ChunkTranscoder();
    private final String _appName;
    private final DynamicBooleanProperty fixup;
    private final DynamicIntProperty fixupPoolSize;
    private final EVCacheClientPool _pool;
    private ThreadPoolExecutor threadPool = null;

    public EVCacheClientUtil(EVCacheClientPool pool) {
        this._pool = pool;
        this._appName = pool.getAppName();
        this.fixup = EVCacheConfig.getInstance().getDynamicBooleanProperty(this._appName + ".addOperation.fixup", Boolean.FALSE);
        this.fixupPoolSize = EVCacheConfig.getInstance().getDynamicIntProperty(this._appName + ".addOperation.fixup.poolsize", 10);
        RejectedExecutionHandler block = new RejectedExecutionHandler(){

            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                ArrayList<Tag> tags = new ArrayList<Tag>(2);
                tags.add((Tag)new BasicTag("ipc.server.app", EVCacheClientUtil.this._appName));
                tags.add((Tag)new BasicTag("ipc.result", "rejected"));
                EVCacheMetricsFactory.getInstance().increment("internal-evc.client.addCall.fixUp", tags);
            }
        };
        LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(10000);
        class SimpleThreadFactory
        implements ThreadFactory {
            private final AtomicInteger counter = new AtomicInteger();

            SimpleThreadFactory() {
            }

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "EVCacheClientUtil-AddFixUp-" + this.counter.getAndIncrement());
                t.setDaemon(true);
                return t;
            }
        }
        this.threadPool = new ThreadPoolExecutor(this.fixupPoolSize.get(), this.fixupPoolSize.get() * 2, 30L, TimeUnit.SECONDS, queue, new SimpleThreadFactory(), block);
        this.threadPool.prestartAllCoreThreads();
    }

    public EVCacheLatch add(final String canonicalKey, CachedData cd, final int timeToLive, EVCacheLatch.Policy policy) throws Exception {
        if (cd == null) {
            return null;
        }
        EVCacheClient[] clients = this._pool.getEVCacheClientForWrite();
        EVCacheLatchImpl latch = new EVCacheLatchImpl(policy, clients.length - this._pool.getWriteOnlyEVCacheClients().length, this._appName){

            @Override
            public void onComplete(OperationFuture<?> operationFuture) throws Exception {
                super.onComplete(operationFuture);
                if (this.getPendingFutureCount() == 0 && EVCacheClientUtil.this.fixup.get()) {
                    RemoteRequest req = new RemoteRequest(this, canonicalKey, timeToLive);
                    EVCacheClientUtil.this.threadPool.submit(req);
                }
            }
        };
        for (EVCacheClient client : clients) {
            Future<Boolean> future = client.add(canonicalKey, timeToLive, cd, this.ct, latch);
            if (!log.isDebugEnabled()) continue;
            log.debug("ADD : Op Submitted : APP " + this._appName + ", key " + canonicalKey + "; future : " + future);
        }
        return latch;
    }

    class RemoteRequest
    implements Runnable {
        private EVCacheLatchImpl latch;
        private String canonicalKey;
        private int timeToLive;

        public RemoteRequest(EVCacheLatchImpl latch, String canonicalKey, int timeToLive) {
            this.latch = latch;
            this.canonicalKey = canonicalKey;
            this.timeToLive = timeToLive;
        }

        @Override
        public void run() {
            List<Future<Boolean>> futures = this.latch.getAllFutures();
            int successCount = 0;
            int failCount = 0;
            for (int i = 0; i < futures.size(); ++i) {
                Future<Boolean> future = futures.get(i);
                if (!(future instanceof EVCacheOperationFuture)) continue;
                EVCacheOperationFuture f = (EVCacheOperationFuture)((Object)future);
                if (f.getStatus().getStatusCode() == StatusCode.SUCCESS) {
                    ++successCount;
                    if (!log.isDebugEnabled()) continue;
                    log.debug("ADD : Success : APP " + EVCacheClientUtil.this._appName + ", key " + this.canonicalKey + ", ServerGroup : " + f.getServerGroup().getName());
                    continue;
                }
                ++failCount;
                if (!log.isDebugEnabled()) continue;
                log.debug("ADD : Fail : APP " + EVCacheClientUtil.this._appName + ", key : " + this.canonicalKey + ", ServerGroup : " + f.getServerGroup().getName());
            }
            if (log.isDebugEnabled()) {
                log.debug("ADD : Status: APP " + EVCacheClientUtil.this._appName + ", key : " + this.canonicalKey + ", failCount : " + failCount + "; successCount : " + successCount);
            }
            if (successCount > 0 && failCount > 0) {
                EVCacheClient client;
                EVCacheOperationFuture f;
                Future<Boolean> evFuture;
                int i;
                CachedData readData = null;
                for (i = 0; i < futures.size(); ++i) {
                    evFuture = futures.get(i);
                    if (!(evFuture instanceof EVCacheOperationFuture) || (f = (EVCacheOperationFuture)((Object)evFuture)).getStatus().getStatusCode() != StatusCode.ERR_EXISTS || (client = EVCacheClientUtil.this._pool.getEVCacheClient(f.getServerGroup())) == null) continue;
                    try {
                        readData = client.get(this.canonicalKey, EVCacheClientUtil.this.ct, false, false);
                    }
                    catch (Exception e) {
                        log.error("Error reading the data", (Throwable)e);
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("Add : Read existing data for: APP " + EVCacheClientUtil.this._appName + ", key " + this.canonicalKey + "; ServerGroup : " + client.getServerGroupName());
                    }
                    if (readData != null) break;
                }
                if (readData != null) {
                    for (i = 0; i < futures.size(); ++i) {
                        evFuture = futures.get(i);
                        if (!(evFuture instanceof OperationFuture) || (f = (EVCacheOperationFuture)((Object)evFuture)).getStatus().getStatusCode() != StatusCode.SUCCESS || (client = EVCacheClientUtil.this._pool.getEVCacheClient(f.getServerGroup())) == null) continue;
                        try {
                            client.set(this.canonicalKey, readData, this.timeToLive, (EVCacheLatch)null);
                            if (log.isDebugEnabled()) {
                                log.debug("Add: Fixup for : APP " + EVCacheClientUtil.this._appName + ", key " + this.canonicalKey + "; ServerGroup : " + client.getServerGroupName());
                            }
                            EVCacheMetricsFactory.getInstance().increment("internal-evc.client.addCall.fixUp", client.getTagList());
                            continue;
                        }
                        catch (Exception e) {
                            if (!log.isDebugEnabled()) continue;
                            log.debug("Add: Fixup Error : APP " + EVCacheClientUtil.this._appName + ", key " + this.canonicalKey + "; ServerGroup : " + client.getServerGroupName(), (Throwable)e);
                        }
                    }
                }
            }
        }
    }
}

