/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.config.server.application;

import ai.vespa.util.http.VespaAsyncHttpClientBuilder;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.yahoo.component.AbstractComponent;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.model.api.HostInfo;
import com.yahoo.config.model.api.PortInfo;
import com.yahoo.config.model.api.ServiceInfo;
import com.yahoo.config.model.api.container.ContainerServiceType;
import com.yahoo.slime.Cursor;
import com.yahoo.vespa.config.server.application.Application;
import com.yahoo.vespa.config.server.http.JSONResponse;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.net.URIBuilder;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.Timeout;

public class ConfigConvergenceChecker
extends AbstractComponent {
    private static final Logger log = Logger.getLogger(ConfigConvergenceChecker.class.getName());
    private static final Set<String> serviceTypesToCheck = Set.of(ContainerServiceType.CONTAINER.serviceName, ContainerServiceType.QRSERVER.serviceName, ContainerServiceType.LOGSERVER_CONTAINER.serviceName, ContainerServiceType.CLUSTERCONTROLLER_CONTAINER.serviceName, "searchnode", "storagenode", "distributor");
    private final Executor responseHandlerExecutor = Executors.newSingleThreadExecutor((ThreadFactory)new DaemonThreadFactory("config-convergence-checker-response-handler-"));
    private final ObjectMapper jsonMapper = new ObjectMapper();

    @Inject
    public ConfigConvergenceChecker() {
    }

    public Map<ServiceInfo, Long> getServiceConfigGenerations(Application application, Duration timeoutPerService) {
        ArrayList<ServiceInfo> servicesToCheck = new ArrayList<ServiceInfo>();
        application.getModel().getHosts().forEach(host -> host.getServices().stream().filter(service -> serviceTypesToCheck.contains(service.getServiceType())).forEach(service -> ConfigConvergenceChecker.getStatePort(service).ifPresent(port -> servicesToCheck.add((ServiceInfo)service))));
        return this.getServiceGenerations(servicesToCheck, timeoutPerService);
    }

    public JSONResponse getServiceConfigGenerationsResponse(Application application, URI requestUrl, Duration timeoutPerService) {
        Map<ServiceInfo, Long> currentGenerations = this.getServiceConfigGenerations(application, timeoutPerService);
        long currentGeneration = currentGenerations.values().stream().mapToLong(Long::longValue).min().orElse(-1L);
        return new ServiceListResponse(200, currentGenerations, requestUrl, application.getApplicationGeneration(), currentGeneration);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public JSONResponse getServiceConfigGenerationResponse(Application application, String hostAndPortToCheck, URI requestUrl, Duration timeout) {
        Long wantedGeneration = application.getApplicationGeneration();
        try (CloseableHttpAsyncClient client = ConfigConvergenceChecker.createHttpClient();){
            client.start();
            if (!this.hostInApplication(application, hostAndPortToCheck)) {
                ServiceResponse serviceResponse2 = ServiceResponse.createHostNotFoundInAppResponse(requestUrl, hostAndPortToCheck, wantedGeneration);
                return serviceResponse2;
            }
            long currentGeneration = this.getServiceGeneration(client, URI.create("http://" + hostAndPortToCheck), timeout).get();
            boolean converged = currentGeneration >= wantedGeneration;
            ServiceResponse serviceResponse = ServiceResponse.createOkResponse(requestUrl, hostAndPortToCheck, wantedGeneration, currentGeneration, converged);
            return serviceResponse;
        }
        catch (InterruptedException | CancellationException | ExecutionException e) {
            return ServiceResponse.createNotFoundResponse(requestUrl, hostAndPortToCheck, wantedGeneration, e.getMessage());
        }
        catch (Exception e) {
            return ServiceResponse.createErrorResponse(requestUrl, hostAndPortToCheck, wantedGeneration, e.getMessage());
        }
    }

    private Map<ServiceInfo, Long> getServiceGenerations(List<ServiceInfo> services, Duration timeout) {
        Map<ServiceInfo, Long> map;
        block9: {
            CloseableHttpAsyncClient client = ConfigConvergenceChecker.createHttpClient();
            try {
                client.start();
                ArrayList<CompletionStage> inprogressRequests = new ArrayList<CompletionStage>();
                ConcurrentHashMap<ServiceInfo, Long> temporaryResult = new ConcurrentHashMap<ServiceInfo, Long>();
                for (ServiceInfo service : services) {
                    int statePort = ConfigConvergenceChecker.getStatePort(service).orElse(0);
                    if (statePort <= 0) continue;
                    URI uri = URI.create("http://" + service.getHostName() + ":" + statePort);
                    CompletionStage inprogressRequest = this.getServiceGeneration(client, uri, timeout).handle((result, error) -> {
                        if (result != null) {
                            temporaryResult.put(service, (Long)result);
                        } else {
                            log.log(Level.FINE, (Throwable)error, () -> String.format("Failed to retrieve service config generation for '%s': %s", service, error.getMessage()));
                            temporaryResult.put(service, -1L);
                        }
                        return null;
                    });
                    inprogressRequests.add(inprogressRequest);
                }
                CompletableFuture.allOf((CompletableFuture[])inprogressRequests.toArray(CompletableFuture[]::new)).join();
                map = ConfigConvergenceChecker.createMapOrderedByServiceList(services, temporaryResult);
                if (client == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (client != null) {
                        try {
                            client.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
            client.close();
        }
        return map;
    }

    private CompletableFuture<Long> getServiceGeneration(CloseableHttpAsyncClient client, URI serviceUrl, Duration timeout) {
        SimpleHttpRequest request = SimpleHttpRequests.get((URI)ConfigConvergenceChecker.createApiUri(serviceUrl));
        request.setHeader("Connection", (Object)"close");
        request.setConfig(ConfigConvergenceChecker.createRequestConfig(timeout));
        final CompletableFuture responsePromise = new CompletableFuture();
        client.execute(request, (FutureCallback)new FutureCallback<SimpleHttpResponse>(){

            public void completed(SimpleHttpResponse result) {
                responsePromise.complete(result);
            }

            public void failed(Exception ex) {
                responsePromise.completeExceptionally(ex);
            }

            public void cancelled() {
                responsePromise.cancel(false);
            }
        });
        return responsePromise.thenApplyAsync(this::handleResponse, this.responseHandlerExecutor);
    }

    private long handleResponse(SimpleHttpResponse response) throws UncheckedIOException {
        try {
            int statusCode = response.getCode();
            if (statusCode != 200) {
                throw new IOException("Expected status code 200, got " + statusCode);
            }
            if (response.getBody() == null) {
                throw new IOException("Response has no content");
            }
            return ConfigConvergenceChecker.generationFromContainerState(this.jsonMapper.readTree(response.getBodyText()));
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private boolean hostInApplication(Application application, String hostPort) {
        for (HostInfo host : application.getModel().getHosts()) {
            if (!hostPort.startsWith(host.getHostname())) continue;
            for (ServiceInfo service : host.getServices()) {
                for (PortInfo port : service.getPorts()) {
                    if (!hostPort.equals(host.getHostname() + ":" + port.getPort())) continue;
                    return true;
                }
            }
        }
        return false;
    }

    private static Optional<Integer> getStatePort(ServiceInfo service) {
        return service.getPorts().stream().filter(port -> port.getTags().contains("state")).map(PortInfo::getPort).findFirst();
    }

    private static long generationFromContainerState(JsonNode state) {
        return state.get("config").get("generation").asLong(-1L);
    }

    private static Map<ServiceInfo, Long> createMapOrderedByServiceList(List<ServiceInfo> services, ConcurrentMap<ServiceInfo, Long> result) {
        LinkedHashMap<ServiceInfo, Long> orderedResult = new LinkedHashMap<ServiceInfo, Long>();
        for (ServiceInfo service : services) {
            Long generation = (Long)result.get(service);
            if (generation == null) continue;
            orderedResult.put(service, generation);
        }
        return orderedResult;
    }

    private static URI createApiUri(URI serviceUrl) {
        try {
            return new URIBuilder(serviceUrl).setPath("/state/v1/config").build();
        }
        catch (URISyntaxException e) {
            throw new IllegalArgumentException(e);
        }
    }

    private static RequestConfig createRequestConfig(Duration timeout) {
        return RequestConfig.custom().setConnectionRequestTimeout(Timeout.ofSeconds((long)1L)).setResponseTimeout(Timeout.ofMilliseconds((long)timeout.toMillis())).setConnectTimeout(Timeout.ofSeconds((long)1L)).build();
    }

    private static CloseableHttpAsyncClient createHttpClient() {
        return VespaAsyncHttpClientBuilder.create(tlsStrategy -> PoolingAsyncClientConnectionManagerBuilder.create().setMaxConnTotal(100).setMaxConnPerRoute(10).setTlsStrategy(tlsStrategy).build()).setIOReactorConfig(IOReactorConfig.custom().setSoTimeout(Timeout.ofSeconds((long)2L)).build()).setUserAgent("config-convergence-checker").setConnectionReuseStrategy((request, response, context) -> false).build();
    }

    private static class ServiceResponse
    extends JSONResponse {
        private ServiceResponse(int status, URI uri, String hostname, Long wantedGeneration) {
            super(status);
            this.object.setString("url", uri.toString());
            this.object.setString("host", hostname);
            this.object.setLong("wantedGeneration", wantedGeneration.longValue());
        }

        static ServiceResponse createOkResponse(URI uri, String hostname, Long wantedGeneration, Long currentGeneration, boolean converged) {
            ServiceResponse serviceResponse = new ServiceResponse(200, uri, hostname, wantedGeneration);
            serviceResponse.object.setBool("converged", converged);
            serviceResponse.object.setLong("currentGeneration", currentGeneration.longValue());
            return serviceResponse;
        }

        static ServiceResponse createHostNotFoundInAppResponse(URI uri, String hostname, Long wantedGeneration) {
            ServiceResponse serviceResponse = new ServiceResponse(410, uri, hostname, wantedGeneration);
            serviceResponse.object.setString("problem", "Host:port (service) no longer part of application, refetch list of services.");
            return serviceResponse;
        }

        static ServiceResponse createErrorResponse(URI uri, String hostname, Long wantedGeneration, String error) {
            ServiceResponse serviceResponse = new ServiceResponse(500, uri, hostname, wantedGeneration);
            serviceResponse.object.setString("error", error);
            return serviceResponse;
        }

        static ServiceResponse createNotFoundResponse(URI uri, String hostname, Long wantedGeneration, String error) {
            ServiceResponse serviceResponse = new ServiceResponse(404, uri, hostname, wantedGeneration);
            serviceResponse.object.setString("error", error);
            return serviceResponse;
        }
    }

    private static class ServiceListResponse
    extends JSONResponse {
        private ServiceListResponse(int status, Map<ServiceInfo, Long> servicesToCheck, URI uri, long wantedGeneration, long currentGeneration) {
            super(status);
            Cursor serviceArray = this.object.setArray("services");
            servicesToCheck.forEach((service, generation) -> {
                Cursor serviceObject = serviceArray.addObject();
                String hostName = service.getHostName();
                int statePort = ConfigConvergenceChecker.getStatePort(service).get();
                serviceObject.setString("host", hostName);
                serviceObject.setLong("port", (long)statePort);
                serviceObject.setString("type", service.getServiceType());
                serviceObject.setString("url", uri.toString() + "/" + hostName + ":" + statePort);
                serviceObject.setLong("currentGeneration", generation.longValue());
            });
            this.object.setString("url", uri.toString());
            this.object.setLong("currentGeneration", currentGeneration);
            this.object.setLong("wantedGeneration", wantedGeneration);
            this.object.setBool("converged", currentGeneration >= wantedGeneration);
        }
    }
}

