/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.config.proxy;

import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.ConfigurationRuntimeException;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.config.subscription.impl.JRTConfigRequester;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Target;
import com.yahoo.jrt.Transport;
import com.yahoo.vespa.config.ConfigCacheKey;
import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.TimingValues;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.proxy.ConfigSourceClient;
import com.yahoo.vespa.config.proxy.DelayedResponse;
import com.yahoo.vespa.config.proxy.DelayedResponseHandler;
import com.yahoo.vespa.config.proxy.DelayedResponses;
import com.yahoo.vespa.config.proxy.MemoryCache;
import com.yahoo.vespa.config.proxy.ProxyServer;
import com.yahoo.vespa.config.proxy.RpcServer;
import com.yahoo.vespa.config.proxy.Subscriber;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

class RpcConfigSourceClient
implements ConfigSourceClient,
Runnable {
    private static final Logger log = Logger.getLogger(RpcConfigSourceClient.class.getName());
    private static final double timingValuesRatio = 0.8;
    private final Supervisor supervisor = new Supervisor(new Transport("config-source-client"));
    private final RpcServer rpcServer;
    private final ConfigSourceSet configSourceSet;
    private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<ConfigCacheKey, Subscriber>();
    private final MemoryCache memoryCache;
    private final DelayedResponses delayedResponses;
    private static final TimingValues timingValues;
    private final ScheduledExecutorService nextConfigScheduler = Executors.newScheduledThreadPool(1, (ThreadFactory)new DaemonThreadFactory("next config"));
    private final ScheduledFuture<?> nextConfigFuture;
    private final JRTConfigRequester requester;
    private final ScheduledExecutorService delayedResponsesScheduler = Executors.newScheduledThreadPool(1, (ThreadFactory)new DaemonThreadFactory("delayed responses"));
    private final ScheduledFuture<?> delayedResponsesFuture;

    RpcConfigSourceClient(RpcServer rpcServer, ConfigSourceSet configSourceSet, MemoryCache memoryCache) {
        this.rpcServer = rpcServer;
        this.configSourceSet = configSourceSet;
        this.memoryCache = memoryCache;
        this.delayedResponses = new DelayedResponses();
        this.checkConfigSources();
        this.nextConfigFuture = this.nextConfigScheduler.scheduleAtFixedRate(this, 0L, 10L, TimeUnit.MILLISECONDS);
        this.requester = JRTConfigRequester.create((ConfigSourceSet)configSourceSet, (TimingValues)timingValues);
        DelayedResponseHandler command = new DelayedResponseHandler(this.delayedResponses, memoryCache, rpcServer);
        this.delayedResponsesFuture = this.delayedResponsesScheduler.scheduleAtFixedRate(command, 5L, 1L, TimeUnit.SECONDS);
    }

    private void checkConfigSources() {
        if (this.configSourceSet == null || this.configSourceSet.getSources() == null || this.configSourceSet.getSources().size() == 0) {
            log.log(Level.WARNING, "No config sources defined, could not check connection");
        } else {
            Request req = new Request("ping");
            for (String configSource : this.configSourceSet.getSources()) {
                Spec spec = new Spec(configSource);
                Target target = this.supervisor.connect(spec);
                target.invokeSync(req, 30.0);
                if (target.isValid()) {
                    log.log(Level.FINE, () -> "Created connection to config source at " + spec.toString());
                    return;
                }
                log.log(Level.INFO, "Could not connect to config source at " + spec.toString());
                target.close();
            }
            String extra = "";
            log.log(Level.INFO, "Could not connect to any config source in set " + this.configSourceSet.toString() + ", please make sure config server(s) are running. " + extra);
        }
    }

    @Override
    public RawConfig getConfig(RawConfig input, JRTServerConfigRequest request) {
        DelayedResponse delayedResponse = new DelayedResponse(request);
        this.delayedResponses.add(delayedResponse);
        ConfigCacheKey configCacheKey = new ConfigCacheKey(input.getKey(), input.getDefMd5());
        RawConfig cachedConfig = this.memoryCache.get(configCacheKey);
        boolean needToGetConfig = true;
        RawConfig ret = null;
        if (cachedConfig != null) {
            log.log(Level.FINE, () -> "Found config " + configCacheKey + " in cache, generation=" + cachedConfig.getGeneration() + ",configmd5=" + cachedConfig.getConfigMd5());
            log.log(Level.FINEST, () -> "input config=" + input + ",cached config=" + cachedConfig);
            if (ProxyServer.configOrGenerationHasChanged(cachedConfig, request)) {
                log.log(Level.FINEST, () -> "Cached config is not equal to requested, will return it");
                if (this.delayedResponses.remove(delayedResponse)) {
                    ret = cachedConfig;
                }
            }
            if (!cachedConfig.isError() && cachedConfig.getGeneration() > 0L) {
                needToGetConfig = false;
            }
        }
        if (needToGetConfig) {
            this.subscribeToConfig(input, configCacheKey);
        }
        return ret;
    }

    private void subscribeToConfig(RawConfig input, ConfigCacheKey configCacheKey) {
        if (this.activeSubscribers.containsKey(configCacheKey)) {
            return;
        }
        log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey);
        Subscriber subscriber = new Subscriber(input, this.configSourceSet, timingValues, this.requester);
        try {
            subscriber.subscribe();
            this.activeSubscribers.put(configCacheKey, subscriber);
        }
        catch (ConfigurationRuntimeException e) {
            log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber");
            subscriber.cancel();
        }
    }

    @Override
    public void run() {
        this.activeSubscribers.values().forEach(subscriber -> {
            if (!subscriber.isClosed()) {
                Optional<RawConfig> config = subscriber.nextGeneration();
                config.ifPresent(this::updateWithNewConfig);
            }
        });
    }

    @Override
    public void cancel() {
        this.shutdownSourceConnections();
        this.delayedResponsesFuture.cancel(true);
        this.delayedResponsesScheduler.shutdownNow();
        this.nextConfigFuture.cancel(true);
        this.nextConfigScheduler.shutdownNow();
        this.requester.close();
        this.supervisor.transport().shutdown().join();
    }

    @Override
    public void shutdownSourceConnections() {
        this.activeSubscribers.values().forEach(Subscriber::cancel);
        this.activeSubscribers.clear();
        this.nextConfigScheduler.shutdownNow();
        this.requester.close();
    }

    @Override
    public String getActiveSourceConnection() {
        return this.requester.getConnectionPool().getCurrent().getAddress();
    }

    @Override
    public List<String> getSourceConnections() {
        ArrayList<String> ret = new ArrayList<String>();
        if (this.configSourceSet != null) {
            ret.addAll(this.configSourceSet.getSources());
        }
        return ret;
    }

    public void updateSubscribers(RawConfig config) {
        log.log(Level.FINE, () -> "Config updated for " + config.getKey() + "," + config.getGeneration());
        DelayQueue<DelayedResponse> responseDelayQueue = this.delayedResponses.responses();
        log.log(Level.FINEST, () -> "Delayed response queue: " + responseDelayQueue);
        if (responseDelayQueue.size() == 0) {
            log.log(Level.FINE, () -> "There exists no matching element on delayed response queue for " + config.getKey());
            return;
        }
        log.log(Level.FINE, () -> "Delayed response queue has " + responseDelayQueue.size() + " elements");
        boolean found = false;
        for (DelayedResponse response : responseDelayQueue.toArray(new DelayedResponse[0])) {
            JRTServerConfigRequest request = response.getRequest();
            if (!request.getConfigKey().equals((Object)config.getKey()) || config.getGeneration() < request.getRequestGeneration() && config.getGeneration() != 0L) continue;
            if (this.delayedResponses.remove(response)) {
                found = true;
                log.log(Level.FINE, () -> "Call returnOkResponse for " + config.getKey() + "," + config.getGeneration());
                this.rpcServer.returnOkResponse(request, config);
                continue;
            }
            log.log(Level.INFO, "Could not remove " + config.getKey() + " from delayedResponses queue, already removed");
        }
        if (!found) {
            log.log(Level.FINE, () -> "Found no recipient for " + config.getKey() + " in delayed response queue");
        }
        log.log(Level.FINE, () -> "Finished updating config for " + config.getKey() + "," + config.getGeneration());
    }

    @Override
    public DelayedResponses delayedResponses() {
        return this.delayedResponses;
    }

    private void updateWithNewConfig(RawConfig newConfig) {
        log.log(Level.FINE, () -> "config to be returned for '" + newConfig.getKey() + "', generation=" + newConfig.getGeneration() + ", payload=" + newConfig.getPayload());
        this.memoryCache.update(newConfig);
        this.updateSubscribers(newConfig);
    }

    static {
        TimingValues tv = new TimingValues();
        tv.setUnconfiguredDelay((long)((double)tv.getUnconfiguredDelay() * 0.8)).setConfiguredErrorDelay((long)((double)tv.getConfiguredErrorDelay() * 0.8)).setSubscribeTimeout((long)((double)tv.getSubscribeTimeout() * 0.8)).setConfiguredErrorTimeout(-1L);
        timingValues = tv;
    }
}

