/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.config.proxy;

import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.ConfigurationRuntimeException;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.config.subscription.impl.JRTConfigRequester;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Target;
import com.yahoo.jrt.Transport;
import com.yahoo.log.LogLevel;
import com.yahoo.vespa.config.ConfigCacheKey;
import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.TimingValues;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.proxy.ConfigSourceClient;
import com.yahoo.vespa.config.proxy.DelayedResponse;
import com.yahoo.vespa.config.proxy.DelayedResponseHandler;
import com.yahoo.vespa.config.proxy.DelayedResponses;
import com.yahoo.vespa.config.proxy.MemoryCache;
import com.yahoo.vespa.config.proxy.ProxyServer;
import com.yahoo.vespa.config.proxy.RpcServer;
import com.yahoo.vespa.config.proxy.Subscriber;
import com.yahoo.vespa.config.proxy.UpstreamConfigSubscriber;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

class RpcConfigSourceClient
implements ConfigSourceClient {
    private static final Logger log = Logger.getLogger(RpcConfigSourceClient.class.getName());
    private static final double timingValuesRatio = 0.8;
    private final Supervisor supervisor = new Supervisor(new Transport());
    private final RpcServer rpcServer;
    private final ConfigSourceSet configSourceSet;
    private final HashMap<ConfigCacheKey, Subscriber> activeSubscribers = new HashMap();
    private final Object activeSubscribersLock = new Object();
    private final MemoryCache memoryCache;
    private final DelayedResponses delayedResponses;
    private static final TimingValues timingValues;
    private final ExecutorService exec;
    private final JRTConfigRequester requester;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, (ThreadFactory)new DaemonThreadFactory());
    private ScheduledFuture<?> delayedResponseScheduler;

    RpcConfigSourceClient(RpcServer rpcServer, ConfigSourceSet configSourceSet, MemoryCache memoryCache) {
        this.rpcServer = rpcServer;
        this.configSourceSet = configSourceSet;
        this.memoryCache = memoryCache;
        this.delayedResponses = new DelayedResponses();
        this.checkConfigSources();
        this.exec = Executors.newCachedThreadPool((ThreadFactory)new DaemonThreadFactory("subscriber-"));
        this.requester = JRTConfigRequester.create((ConfigSourceSet)configSourceSet, (TimingValues)timingValues);
        this.delayedResponseScheduler = this.scheduler.scheduleAtFixedRate(new DelayedResponseHandler(this.delayedResponses, memoryCache, rpcServer), 5L, 1L, TimeUnit.SECONDS);
    }

    private void checkConfigSources() {
        if (this.configSourceSet == null || this.configSourceSet.getSources() == null || this.configSourceSet.getSources().size() == 0) {
            log.log(LogLevel.WARNING, "No config sources defined, could not check connection");
        } else {
            Request req = new Request("ping");
            for (String configSource : this.configSourceSet.getSources()) {
                Spec spec = new Spec(configSource);
                Target target = this.supervisor.connect(spec);
                target.invokeSync(req, 30.0);
                if (target.isValid()) {
                    log.log((Level)LogLevel.DEBUG, () -> "Created connection to config source at " + spec.toString());
                    return;
                }
                log.log(LogLevel.INFO, "Could not connect to config source at " + spec.toString());
                target.close();
            }
            String extra = "";
            log.log(LogLevel.INFO, "Could not connect to any config source in set " + this.configSourceSet.toString() + ", please make sure config server(s) are running. " + extra);
        }
    }

    @Override
    public RawConfig getConfig(RawConfig input, JRTServerConfigRequest request) {
        DelayedResponse delayedResponse = new DelayedResponse(request);
        this.delayedResponses.add(delayedResponse);
        ConfigCacheKey configCacheKey = new ConfigCacheKey(input.getKey(), input.getDefMd5());
        RawConfig cachedConfig = this.memoryCache.get(configCacheKey);
        boolean needToGetConfig = true;
        RawConfig ret = null;
        if (cachedConfig != null) {
            log.log((Level)LogLevel.DEBUG, () -> "Found config " + configCacheKey + " in cache, generation=" + cachedConfig.getGeneration() + ",configmd5=" + cachedConfig.getConfigMd5());
            log.log((Level)LogLevel.SPAM, () -> "input config=" + input + ",cached config=" + cachedConfig);
            if (ProxyServer.configOrGenerationHasChanged(cachedConfig, request)) {
                log.log((Level)LogLevel.SPAM, () -> "Cached config is not equal to requested, will return it");
                if (this.delayedResponses.remove(delayedResponse)) {
                    ret = cachedConfig;
                }
            }
            if (!cachedConfig.isError() && cachedConfig.getGeneration() > 0L) {
                needToGetConfig = false;
            }
        }
        if (needToGetConfig) {
            this.subscribeToConfig(input, configCacheKey);
        }
        return ret;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subscribeToConfig(RawConfig input, ConfigCacheKey configCacheKey) {
        Object object = this.activeSubscribersLock;
        synchronized (object) {
            if (this.activeSubscribers.containsKey(configCacheKey)) {
                log.log((Level)LogLevel.DEBUG, () -> "Already a subscriber running for: " + configCacheKey);
            } else {
                log.log((Level)LogLevel.DEBUG, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey);
                UpstreamConfigSubscriber subscriber = new UpstreamConfigSubscriber(input, this, this.configSourceSet, timingValues, this.requester, this.memoryCache);
                try {
                    subscriber.subscribe();
                    this.activeSubscribers.put(configCacheKey, subscriber);
                    this.exec.execute(subscriber);
                }
                catch (ConfigurationRuntimeException e) {
                    log.log(LogLevel.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber");
                    subscriber.cancel();
                }
            }
        }
    }

    @Override
    public void cancel() {
        this.shutdownSourceConnections();
        this.delayedResponseScheduler.cancel(true);
        this.scheduler.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdownSourceConnections() {
        Object object = this.activeSubscribersLock;
        synchronized (object) {
            for (Subscriber subscriber : this.activeSubscribers.values()) {
                subscriber.cancel();
            }
            this.activeSubscribers.clear();
        }
        this.exec.shutdown();
        this.requester.close();
    }

    @Override
    public String getActiveSourceConnection() {
        return this.requester.getConnectionPool().getCurrent().getAddress();
    }

    @Override
    public List<String> getSourceConnections() {
        ArrayList<String> ret = new ArrayList<String>();
        if (this.configSourceSet != null) {
            ret.addAll(this.configSourceSet.getSources());
        }
        return ret;
    }

    @Override
    public void updateSubscribers(RawConfig config) {
        log.log((Level)LogLevel.DEBUG, () -> "Config updated for " + config.getKey() + "," + config.getGeneration());
        DelayQueue<DelayedResponse> responseDelayQueue = this.delayedResponses.responses();
        log.log((Level)LogLevel.SPAM, () -> "Delayed response queue: " + responseDelayQueue);
        if (responseDelayQueue.size() == 0) {
            log.log((Level)LogLevel.DEBUG, () -> "There exists no matching element on delayed response queue for " + config.getKey());
            return;
        }
        log.log((Level)LogLevel.DEBUG, () -> "Delayed response queue has " + responseDelayQueue.size() + " elements");
        boolean found = false;
        for (DelayedResponse response : responseDelayQueue.toArray(new DelayedResponse[0])) {
            JRTServerConfigRequest request = response.getRequest();
            if (!request.getConfigKey().equals((Object)config.getKey()) || config.getGeneration() < request.getRequestGeneration() && config.getGeneration() != 0L) continue;
            if (this.delayedResponses.remove(response)) {
                found = true;
                log.log((Level)LogLevel.DEBUG, () -> "Call returnOkResponse for " + config.getKey() + "," + config.getGeneration());
                this.rpcServer.returnOkResponse(request, config);
                continue;
            }
            log.log(LogLevel.INFO, "Could not remove " + config.getKey() + " from delayedResponses queue, already removed");
        }
        if (!found) {
            log.log((Level)LogLevel.DEBUG, () -> "Found no recipient for " + config.getKey() + " in delayed response queue");
        }
        log.log((Level)LogLevel.DEBUG, () -> "Finished updating config for " + config.getKey() + "," + config.getGeneration());
    }

    @Override
    public DelayedResponses delayedResponses() {
        return this.delayedResponses;
    }

    static {
        TimingValues tv = new TimingValues();
        tv.setUnconfiguredDelay((long)((double)tv.getUnconfiguredDelay() * 0.8)).setConfiguredErrorDelay((long)((double)tv.getConfiguredErrorDelay() * 0.8)).setSubscribeTimeout((long)((double)tv.getSubscribeTimeout() * 0.8)).setConfiguredErrorTimeout(-1L);
        timingValues = tv;
    }
}

