/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.config.server.rpc;

import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.jrt.Target;
import com.yahoo.jrt.TargetWatcher;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.server.GetConfigContext;
import com.yahoo.vespa.config.server.monitoring.MetricUpdater;
import com.yahoo.vespa.config.server.monitoring.Metrics;
import com.yahoo.vespa.config.server.rpc.RpcServer;
import com.yahoo.vespa.config.server.tenant.TenantRepository;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class DelayedConfigResponses {
    private static final Logger log = Logger.getLogger(DelayedConfigResponses.class.getName());
    private final RpcServer rpcServer;
    private final ScheduledExecutorService executorService;
    private final boolean useJrtWatcher;
    private final Map<ApplicationId, MetricUpdater> metrics = new ConcurrentHashMap<ApplicationId, MetricUpdater>();
    private final Map<ApplicationId, BlockingQueue<DelayedConfigResponse>> delayedResponses = new ConcurrentHashMap<ApplicationId, BlockingQueue<DelayedConfigResponse>>();

    DelayedConfigResponses(RpcServer rpcServer, int numTimerThreads) {
        this(rpcServer, numTimerThreads, true);
    }

    DelayedConfigResponses(RpcServer rpcServer, int numTimerThreads, boolean useJrtWatcher) {
        this.rpcServer = rpcServer;
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(numTimerThreads, ThreadFactoryFactory.getDaemonThreadFactory((String)"delayed config responses"));
        executor.setRemoveOnCancelPolicy(true);
        this.executorService = executor;
        this.useJrtWatcher = useJrtWatcher;
    }

    List<DelayedConfigResponse> allDelayedResponses() {
        ArrayList<DelayedConfigResponse> responses = new ArrayList<DelayedConfigResponse>();
        for (Map.Entry<ApplicationId, BlockingQueue<DelayedConfigResponse>> entry : this.delayedResponses.entrySet()) {
            responses.addAll((Collection<DelayedConfigResponse>)entry.getValue());
        }
        return responses;
    }

    final void delayResponse(JRTServerConfigRequest request, GetConfigContext context) {
        if (request.isDelayedResponse()) {
            log.log(Level.FINE, () -> context.logPre() + "Request already delayed");
        } else {
            this.createQueueIfNotExists(context);
            BlockingQueue<DelayedConfigResponse> delayedResponsesQueue = this.delayedResponses.get(context.applicationId());
            DelayedConfigResponse response = new DelayedConfigResponse(request, delayedResponsesQueue, context.applicationId());
            request.setDelayedResponse(true);
            try {
                if (log.isLoggable(Level.FINE)) {
                    log.log(Level.FINE, context.logPre() + "Putting on delayedRequests queue (" + delayedResponsesQueue.size() + " elements): " + response.getRequest().getShortDescription());
                }
                response.schedule(Math.max(0L, request.getTimeout()));
                this.metricDelayedResponses(context.applicationId(), delayedResponsesQueue.size());
            }
            catch (InterruptedException e) {
                log.log(Level.WARNING, context.logPre() + "Interrupted when putting on delayed requests queue.");
            }
        }
    }

    private synchronized void metricDelayedResponses(ApplicationId app, int elems) {
        if (!this.metrics.containsKey(app)) {
            this.metrics.put(app, this.rpcServer.metricUpdaterFactory().getOrCreateMetricUpdater(Metrics.createDimensions(app)));
        }
        this.metrics.get(app).setDelayedResponses(elems);
    }

    private synchronized void createQueueIfNotExists(GetConfigContext context) {
        if (!this.delayedResponses.containsKey(context.applicationId())) {
            this.delayedResponses.put(context.applicationId(), new LinkedBlockingQueue());
        }
    }

    void stop() {
        this.executorService.shutdown();
    }

    List<DelayedConfigResponse> drainQueue(ApplicationId app) {
        ArrayList<DelayedConfigResponse> ret = new ArrayList<DelayedConfigResponse>();
        if (this.delayedResponses.containsKey(app)) {
            BlockingQueue<DelayedConfigResponse> queue = this.delayedResponses.get(app);
            queue.drainTo(ret);
        }
        this.metrics.remove(app);
        return ret;
    }

    public String toString() {
        return "DelayedConfigResponses. Average Size=" + this.size();
    }

    int size() {
        int totalQueueSize = 0;
        int numQueues = 0;
        for (Map.Entry<ApplicationId, BlockingQueue<DelayedConfigResponse>> e : this.delayedResponses.entrySet()) {
            ++numQueues;
            totalQueueSize += e.getValue().size();
        }
        return numQueues > 0 ? totalQueueSize / numQueues : 0;
    }

    class DelayedConfigResponse
    implements Runnable,
    TargetWatcher {
        final JRTServerConfigRequest request;
        private final BlockingQueue<DelayedConfigResponse> delayedResponsesQueue;
        private final ApplicationId app;
        private ScheduledFuture<?> future;

        DelayedConfigResponse(JRTServerConfigRequest req, BlockingQueue<DelayedConfigResponse> delayedResponsesQueue, ApplicationId app) {
            this.request = req;
            this.delayedResponsesQueue = delayedResponsesQueue;
            this.app = app;
        }

        @Override
        public synchronized void run() {
            this.removeFromQueue();
            this.removeWatcher();
            DelayedConfigResponses.this.rpcServer.addToRequestQueue(this.request, true, null);
            if (log.isLoggable(Level.FINE)) {
                log.log(Level.FINE, this.logPre() + "DelayedConfigResponse. putting on queue: " + this.request.getShortDescription());
            }
        }

        private void removeFromQueue() {
            this.delayedResponsesQueue.remove(this);
        }

        JRTServerConfigRequest getRequest() {
            return this.request;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("Delayed response for ").append(this.logPre()).append(this.request.getShortDescription());
            return sb.toString();
        }

        String logPre() {
            return TenantRepository.logPre(this.app);
        }

        synchronized void cancelAndRemove() {
            this.removeFromQueue();
            this.cancel();
        }

        synchronized boolean cancel() {
            this.removeWatcher();
            if (this.future == null) {
                throw new IllegalStateException("Cannot cancel a task that has not been scheduled");
            }
            return this.future.cancel(false);
        }

        synchronized void schedule(long delay) throws InterruptedException {
            this.delayedResponsesQueue.put(this);
            this.future = DelayedConfigResponses.this.executorService.schedule(this, delay, TimeUnit.MILLISECONDS);
            this.addWatcher();
        }

        public void notifyTargetInvalid(Target target) {
            this.cancelAndRemove();
        }

        private void addWatcher() {
            if (DelayedConfigResponses.this.useJrtWatcher) {
                this.request.getRequest().target().addWatcher((TargetWatcher)this);
            }
        }

        private void removeWatcher() {
            if (DelayedConfigResponses.this.useJrtWatcher) {
                this.request.getRequest().target().removeWatcher((TargetWatcher)this);
            }
        }
    }
}

