/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.config.server.application;

import ai.vespa.util.http.hc5.VespaAsyncHttpClientBuilder;
import com.fasterxml.jackson.databind.JsonNode;
import com.yahoo.component.AbstractComponent;
import com.yahoo.component.annotation.Inject;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.model.api.ApplicationClusterInfo;
import com.yahoo.config.model.api.HostInfo;
import com.yahoo.config.model.api.PortInfo;
import com.yahoo.config.model.api.ServiceInfo;
import com.yahoo.config.model.api.container.ContainerServiceType;
import com.yahoo.json.Jackson;
import com.yahoo.vespa.config.server.application.Application;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.net.URIBuilder;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;

public class ConfigConvergenceChecker
extends AbstractComponent {
    private static final Logger log = Logger.getLogger(ConfigConvergenceChecker.class.getName());
    private static final Set<String> serviceTypesToCheck = Set.of(ContainerServiceType.CONTAINER.serviceName, ContainerServiceType.LOGSERVER_CONTAINER.serviceName, ContainerServiceType.CLUSTERCONTROLLER_CONTAINER.serviceName, ContainerServiceType.METRICS_PROXY_CONTAINER.serviceName, "searchnode", "storagenode", "distributor");
    private final ExecutorService responseHandlerExecutor = Executors.newSingleThreadExecutor((ThreadFactory)new DaemonThreadFactory("config-convergence-checker-response-handler-"));

    @Inject
    public ConfigConvergenceChecker() {
    }

    public Map<ServiceInfo, Long> getServiceConfigGenerations(Application application, Duration timeoutPerService) {
        return this.getServiceConfigGenerations(application, timeoutPerService, new HostsToCheck(Set.of()));
    }

    private Map<ServiceInfo, Long> getServiceConfigGenerations(Application application, Duration timeoutPerService, HostsToCheck hostsToCheck) {
        ArrayList<ServiceInfo> servicesToCheck = new ArrayList<ServiceInfo>();
        application.getModel().getHosts().forEach(host -> host.getServices().stream().filter(service -> serviceTypesToCheck.contains(service.getServiceType())).filter(serviceInfo -> this.shouldCheckService(hostsToCheck, application, (ServiceInfo)serviceInfo)).forEach(service -> ConfigConvergenceChecker.getStatePort(service).ifPresent(port -> servicesToCheck.add((ServiceInfo)service))));
        log.log(Level.FINE, () -> "Services to check for config convergence: " + String.valueOf(servicesToCheck));
        return this.getServiceGenerations(servicesToCheck, timeoutPerService);
    }

    public ServiceListResponse checkConvergenceForAllServices(Application application, Duration timeoutPerService) {
        return this.checkConvergence(application, timeoutPerService, new HostsToCheck(Set.of()));
    }

    public ServiceListResponse checkConvergenceUnlessDeferringChangesUntilRestart(Application application, Set<String> hostnames) {
        Duration timeoutPerService = Duration.ofSeconds(10L);
        return this.checkConvergence(application, timeoutPerService, new HostsToCheck(hostnames));
    }

    private ServiceListResponse checkConvergence(Application application, Duration timeoutPerService, HostsToCheck hostsToCheck) {
        Map<ServiceInfo, Long> currentGenerations = this.getServiceConfigGenerations(application, timeoutPerService, hostsToCheck);
        long currentGeneration = currentGenerations.values().stream().mapToLong(Long::longValue).min().orElse(-1L);
        return new ServiceListResponse(currentGenerations, application.getApplicationGeneration(), currentGeneration);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public ServiceResponse getServiceConfigGeneration(Application application, String hostAndPortToCheck, Duration timeout) {
        Long wantedGeneration = application.getApplicationGeneration();
        try (CloseableHttpAsyncClient client = ConfigConvergenceChecker.createHttpClient();){
            client.start();
            if (!this.hostInApplication(application, hostAndPortToCheck)) {
                ServiceResponse serviceResponse2 = new ServiceResponse(ServiceResponse.Status.hostNotFound, wantedGeneration);
                return serviceResponse2;
            }
            long currentGeneration = this.getServiceGeneration(client, URI.create("http://" + hostAndPortToCheck), timeout).get();
            boolean converged = currentGeneration >= wantedGeneration;
            ServiceResponse serviceResponse = new ServiceResponse(ServiceResponse.Status.ok, wantedGeneration, currentGeneration, converged);
            return serviceResponse;
        }
        catch (InterruptedException | CancellationException | ExecutionException e) {
            return new ServiceResponse(ServiceResponse.Status.notFound, (long)wantedGeneration, e.getMessage());
        }
        catch (Exception e) {
            return new ServiceResponse(ServiceResponse.Status.error, (long)wantedGeneration, e.getMessage());
        }
    }

    private boolean shouldCheckService(HostsToCheck hostsToCheck, Application application, ServiceInfo serviceInfo) {
        if (hostsToCheck.checkAll()) {
            return true;
        }
        if (!hostsToCheck.check(serviceInfo.getHostName())) {
            return false;
        }
        if (this.isNotContainer(serviceInfo)) {
            return true;
        }
        return this.serviceIsInClusterWhichShouldBeChecked(application, serviceInfo);
    }

    private boolean isNotContainer(ServiceInfo serviceInfo) {
        return !List.of(ContainerServiceType.CONTAINER.serviceName, ContainerServiceType.METRICS_PROXY_CONTAINER).contains(serviceInfo.getServiceType());
    }

    private boolean serviceIsInClusterWhichShouldBeChecked(Application application, ServiceInfo serviceInfo) {
        Set excludeFromChecking = application.getModel().applicationClusterInfo().stream().filter(ApplicationClusterInfo::getDeferChangesUntilRestart).collect(Collectors.toSet());
        return excludeFromChecking.stream().noneMatch(info -> info.name().equals(serviceInfo.getProperty("clustername").orElse("")));
    }

    private Map<ServiceInfo, Long> getServiceGenerations(List<ServiceInfo> services, Duration timeout) {
        Map<ServiceInfo, Long> map;
        block9: {
            CloseableHttpAsyncClient client = ConfigConvergenceChecker.createHttpClient();
            try {
                client.start();
                ArrayList<CompletionStage> inprogressRequests = new ArrayList<CompletionStage>();
                ConcurrentHashMap<ServiceInfo, Long> temporaryResult = new ConcurrentHashMap<ServiceInfo, Long>();
                for (ServiceInfo service : services) {
                    int statePort = ConfigConvergenceChecker.getStatePort(service).orElse(0);
                    if (statePort <= 0) continue;
                    URI uri = URI.create("http://" + service.getHostName() + ":" + statePort);
                    CompletionStage inprogressRequest = this.getServiceGeneration(client, uri, timeout).handle((result, error) -> {
                        if (result != null) {
                            temporaryResult.put(service, (Long)result);
                        } else {
                            log.log(Level.FINE, (Throwable)error, () -> String.format("Failed to retrieve service config generation for '%s': %s", service, error.getMessage()));
                            temporaryResult.put(service, -1L);
                        }
                        return null;
                    });
                    inprogressRequests.add(inprogressRequest);
                }
                CompletableFuture.allOf((CompletableFuture[])inprogressRequests.toArray(CompletableFuture[]::new)).join();
                map = ConfigConvergenceChecker.createMapOrderedByServiceList(services, temporaryResult);
                if (client == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (client != null) {
                        try {
                            client.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
            client.close();
        }
        return map;
    }

    private CompletableFuture<Long> getServiceGeneration(CloseableHttpAsyncClient client, URI serviceUrl, Duration timeout) {
        SimpleHttpRequest request = SimpleRequestBuilder.get((URI)ConfigConvergenceChecker.createApiUri(serviceUrl)).build();
        request.setConfig(ConfigConvergenceChecker.createRequestConfig(timeout));
        final CompletableFuture responsePromise = new CompletableFuture();
        client.execute(request, (FutureCallback)new FutureCallback<SimpleHttpResponse>(){

            public void completed(SimpleHttpResponse result) {
                responsePromise.complete(result);
            }

            public void failed(Exception ex) {
                responsePromise.completeExceptionally(ex);
            }

            public void cancelled() {
                responsePromise.cancel(false);
            }
        });
        return responsePromise.thenApplyAsync(this::handleResponse, (Executor)this.responseHandlerExecutor);
    }

    private long handleResponse(SimpleHttpResponse response) throws UncheckedIOException {
        try {
            int statusCode = response.getCode();
            if (statusCode != 200) {
                throw new IOException("Expected status code 200, got " + statusCode);
            }
            if (response.getBody() == null) {
                throw new IOException("Response has no content");
            }
            return ConfigConvergenceChecker.generationFromContainerState(Jackson.mapper().readTree(response.getBodyText()));
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private boolean hostInApplication(Application application, String hostPort) {
        for (HostInfo host : application.getModel().getHosts()) {
            if (!hostPort.startsWith(host.getHostname())) continue;
            for (ServiceInfo service : host.getServices()) {
                for (PortInfo port : service.getPorts()) {
                    if (!hostPort.equals(host.getHostname() + ":" + port.getPort())) continue;
                    return true;
                }
            }
        }
        return false;
    }

    public static Optional<Integer> getStatePort(ServiceInfo service) {
        return service.getPorts().stream().filter(port -> port.getTags().contains("state")).map(PortInfo::getPort).findFirst();
    }

    public void deconstruct() {
        this.responseHandlerExecutor.shutdown();
        try {
            this.responseHandlerExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            log.log(Level.WARNING, "Unable to shutdown executor", e);
        }
    }

    private static long generationFromContainerState(JsonNode state) {
        return state.get("config").get("generation").asLong(-1L);
    }

    private static Map<ServiceInfo, Long> createMapOrderedByServiceList(List<ServiceInfo> services, ConcurrentMap<ServiceInfo, Long> result) {
        LinkedHashMap<ServiceInfo, Long> orderedResult = new LinkedHashMap<ServiceInfo, Long>();
        for (ServiceInfo service : services) {
            Long generation = (Long)result.get(service);
            if (generation == null) continue;
            orderedResult.put(service, generation);
        }
        return orderedResult;
    }

    private static URI createApiUri(URI serviceUrl) {
        try {
            return new URIBuilder(serviceUrl).setPath("/state/v1/config").build();
        }
        catch (URISyntaxException e) {
            throw new IllegalArgumentException(e);
        }
    }

    private static RequestConfig createRequestConfig(Duration timeout) {
        return RequestConfig.custom().setConnectionRequestTimeout(Timeout.ofSeconds((long)10L)).setResponseTimeout(Timeout.ofMilliseconds((long)timeout.toMillis())).build();
    }

    private static CloseableHttpAsyncClient createHttpClient() {
        return VespaAsyncHttpClientBuilder.create(tlsStrategy -> PoolingAsyncClientConnectionManagerBuilder.create().setMaxConnTotal(100).setMaxConnPerRoute(10).setDefaultConnectionConfig(ConnectionConfig.custom().setTimeToLive(TimeValue.ofMilliseconds((long)1L)).setConnectTimeout(Timeout.ofSeconds((long)10L)).build()).setTlsStrategy(tlsStrategy).build()).setIOReactorConfig(IOReactorConfig.custom().setSoTimeout(Timeout.ofSeconds((long)2L)).build()).setUserAgent("config-convergence-checker").build();
    }

    private record HostsToCheck(Set<String> hostnames) {
        public boolean checkAll() {
            return this.hostnames.isEmpty();
        }

        public boolean check(String hostname) {
            return this.checkAll() || this.hostnames.contains(hostname);
        }
    }

    public static class ServiceListResponse {
        public final List<Service> services = new ArrayList<Service>();
        public final long wantedGeneration;
        public final long currentGeneration;
        public final boolean converged;

        private ServiceListResponse(List<Service> services, long wantedGeneration, long currentGeneration, boolean converged) {
            this.services.addAll(services);
            this.wantedGeneration = wantedGeneration;
            this.currentGeneration = currentGeneration;
            this.converged = converged;
        }

        public ServiceListResponse(Map<ServiceInfo, Long> services, long wantedGeneration, long currentGeneration) {
            this(services.entrySet().stream().map(entry -> new Service((ServiceInfo)entry.getKey(), (Long)entry.getValue())).toList(), wantedGeneration, currentGeneration, currentGeneration >= wantedGeneration);
        }

        public ServiceListResponse unconverged() {
            return new ServiceListResponse(this.services, this.wantedGeneration, this.currentGeneration, false);
        }

        public List<Service> services() {
            return this.services;
        }

        public static class Service {
            public final ServiceInfo serviceInfo;
            public final Long currentGeneration;

            public Service(ServiceInfo serviceInfo, Long currentGeneration) {
                this.serviceInfo = serviceInfo;
                this.currentGeneration = currentGeneration;
            }
        }
    }

    public static class ServiceResponse {
        public final Status status;
        public final Long wantedGeneration;
        public final Long currentGeneration;
        public final boolean converged;
        public final Optional<String> errorMessage;

        public ServiceResponse(Status status, long wantedGeneration) {
            this(status, wantedGeneration, 0L);
        }

        public ServiceResponse(Status status, long wantedGeneration, long currentGeneration) {
            this(status, wantedGeneration, currentGeneration, false);
        }

        public ServiceResponse(Status status, long wantedGeneration, long currentGeneration, boolean converged) {
            this(status, wantedGeneration, currentGeneration, converged, Optional.empty());
        }

        public ServiceResponse(Status status, long wantedGeneration, String errorMessage) {
            this(status, wantedGeneration, 0L, false, Optional.ofNullable(errorMessage));
        }

        private ServiceResponse(Status status, long wantedGeneration, long currentGeneration, boolean converged, Optional<String> errorMessage) {
            this.status = status;
            this.wantedGeneration = wantedGeneration;
            this.currentGeneration = currentGeneration;
            this.converged = converged;
            this.errorMessage = errorMessage;
        }

        public static enum Status {
            ok,
            notFound,
            hostNotFound,
            error;

        }
    }
}

