/*
 * Decompiled with CFR 0.152.
 */
package org.terracotta.dynamic_config.cli.api.stop;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.diagnostic.client.DiagnosticService;
import org.terracotta.diagnostic.client.connection.ConcurrencySizing;
import org.terracotta.diagnostic.client.connection.DiagnosticServiceProvider;
import org.terracotta.dynamic_config.api.model.Node;
import org.terracotta.dynamic_config.api.service.DynamicConfigService;
import org.terracotta.dynamic_config.cli.api.stop.StopProgress;

public class StopService {
    private static final Logger LOGGER = LoggerFactory.getLogger(StopService.class);
    private final DiagnosticServiceProvider diagnosticServiceProvider;
    private final ConcurrencySizing concurrencySizing;

    public StopService(DiagnosticServiceProvider diagnosticServiceProvider, ConcurrencySizing concurrencySizing) {
        this.diagnosticServiceProvider = Objects.requireNonNull(diagnosticServiceProvider);
        this.concurrencySizing = Objects.requireNonNull(concurrencySizing);
    }

    public StopProgress stopNodes(Collection<Node.Endpoint> endpoints, Duration stopDelay) {
        if (stopDelay.getSeconds() < 1L) {
            throw new IllegalArgumentException("Stop delay must be at least 1 second");
        }
        LOGGER.debug("Asking all nodes: {} to stop themselves", endpoints);
        HashMap<Node.Endpoint, DiagnosticService> stopRequested = new HashMap<Node.Endpoint, DiagnosticService>();
        final HashMap<Node.Endpoint, Exception> stopRequestFailed = new HashMap<Node.Endpoint, Exception>();
        for (Node.Endpoint addr : endpoints) {
            try {
                DiagnosticService diagnosticService2 = this.diagnosticServiceProvider.fetchDiagnosticService(addr.getHostPort().createInetSocketAddress());
                stopRequested.put(addr, diagnosticService2);
                ((DynamicConfigService)diagnosticService2.getProxy(DynamicConfigService.class)).stop(stopDelay);
            }
            catch (Exception e) {
                stopRequestFailed.put(addr, e);
                LOGGER.debug("Failed asking node {} to stop: {}", new Object[]{addr, e.getMessage(), e});
            }
        }
        final CountDownLatch done = new CountDownLatch(stopRequested.size());
        final CopyOnWriteArrayList stoppedNodes = new CopyOnWriteArrayList();
        final AtomicReference progressCallback = new AtomicReference();
        final AtomicBoolean continuePolling = new AtomicBoolean(true);
        final ExecutorService executorService = Executors.newFixedThreadPool(this.concurrencySizing.getThreadCount(endpoints.size()), r -> new Thread(r, this.getClass().getName()));
        stopRequested.forEach((endpoint, diagnosticService) -> executorService.submit(() -> {
            while (continuePolling.get() && !Thread.currentThread().isInterrupted() && diagnosticService.isConnected()) {
                try {
                    LOGGER.debug("Waiting for node: {} to stop...", endpoint);
                    Thread.sleep(500L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            if (!diagnosticService.isConnected()) {
                LOGGER.debug("Node: {} has stopped", endpoint);
                stoppedNodes.add(endpoint);
                Consumer cb = (Consumer)progressCallback.get();
                if (cb != null) {
                    cb.accept(endpoint);
                }
                done.countDown();
            } else {
                LOGGER.warn("Shutdown of node: {} has been interrupted", endpoint);
            }
        }));
        return new StopProgress(){

            @Override
            public void await() throws InterruptedException {
                try {
                    done.await();
                }
                finally {
                    continuePolling.set(false);
                    StopService.this.shutdown(executorService);
                }
            }

            @Override
            @SuppressFBWarnings(value={"RV_RETURN_VALUE_IGNORED"})
            public Collection<Node.Endpoint> await(Duration duration) throws InterruptedException {
                try {
                    done.await(duration.toMillis(), TimeUnit.MILLISECONDS);
                    ArrayList<Node.Endpoint> arrayList = new ArrayList<Node.Endpoint>(stoppedNodes);
                    return arrayList;
                }
                finally {
                    continuePolling.set(false);
                    StopService.this.shutdown(executorService);
                }
            }

            @Override
            public void onStopped(Consumer<Node.Endpoint> c) {
                progressCallback.set(c);
                stoppedNodes.forEach(c);
            }

            @Override
            public Map<Node.Endpoint, Exception> getErrors() {
                return stopRequestFailed;
            }
        };
    }

    private void shutdown(ExecutorService executorService) {
        executorService.shutdownNow();
        try {
            if (!executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

