/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.ribbon.evache;

import com.netflix.evcache.EVCache;
import com.netflix.evcache.EVCacheException;
import com.netflix.ribbon.CacheProvider;
import com.netflix.ribbon.evache.CacheFaultException;
import com.netflix.ribbon.evache.CacheMissException;
import com.netflix.ribbon.evache.EvCacheOptions;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;

public class EvCacheProvider<T>
implements CacheProvider<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(EvCacheProvider.class);
    private static final long WATCH_INTERVAL = 1L;
    private static final FutureObserver FUTURE_OBSERVER = new FutureObserver();
    private final EvCacheOptions options;
    private final EVCache evCache;

    public EvCacheProvider(EvCacheOptions options) {
        this.options = options;
        EVCache.Builder builder = new EVCache.Builder();
        if (options.isEnableZoneFallback()) {
            builder.enableZoneFallback();
        }
        builder.setDefaultTTL(options.getTimeToLive());
        builder.setAppName(options.getAppName());
        builder.setCacheName(options.getCacheName());
        this.evCache = builder.build();
    }

    public Observable<T> get(final String key, Map<String, Object> requestProperties) {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<T>(){

            public void call(Subscriber<? super T> subscriber) {
                try {
                    Future getFuture = EvCacheProvider.this.options.getTranscoder() == null ? EvCacheProvider.this.evCache.getAsynchronous(key) : EvCacheProvider.this.evCache.getAsynchronous(key, EvCacheProvider.this.options.getTranscoder());
                    FUTURE_OBSERVER.watchFuture(getFuture, subscriber);
                }
                catch (EVCacheException e) {
                    subscriber.onError((Throwable)new CacheFaultException("EVCache exception when getting value for key " + key, e));
                }
            }
        });
    }

    static {
        FUTURE_OBSERVER.start();
    }

    static final class FutureObserver
    extends Thread {
        private final Map<Future, Subscriber> futureMap = new ConcurrentHashMap<Future, Subscriber>();

        FutureObserver() {
            super("EvCache-Future-Observer");
            this.setDaemon(true);
        }

        @Override
        public void run() {
            while (true) {
                for (Map.Entry<Future, Subscriber> f : this.futureMap.entrySet()) {
                    Future future = f.getKey();
                    Subscriber subscriber = f.getValue();
                    if (subscriber.isUnsubscribed()) {
                        future.cancel(true);
                        this.futureMap.remove(future);
                        continue;
                    }
                    if (!future.isDone()) continue;
                    try {
                        FutureObserver.handleCompletedFuture(future, subscriber);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }
                    finally {
                        this.futureMap.remove(future);
                    }
                }
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException interruptedException) {
                }
            }
        }

        private static void handleCompletedFuture(Future future, Subscriber subscriber) throws InterruptedException {
            if (future.isCancelled()) {
                subscriber.onError((Throwable)new CacheFaultException("cache get request canceled"));
            } else {
                try {
                    Object value = future.get();
                    if (value == null) {
                        subscriber.onError((Throwable)new CacheMissException());
                    } else {
                        subscriber.onNext(value);
                        subscriber.onCompleted();
                    }
                }
                catch (ExecutionException e) {
                    subscriber.onError(e.getCause());
                }
            }
        }

        void watchFuture(Future future, Subscriber<?> subscriber) {
            this.futureMap.put(future, subscriber);
        }
    }
}

