/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.config.server.application;

import ai.vespa.util.http.hc5.VespaAsyncHttpClientBuilder;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yahoo.concurrent.CompletableFutures;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.model.api.PortInfo;
import com.yahoo.config.model.api.ServiceInfo;
import com.yahoo.config.model.api.container.ContainerServiceType;
import com.yahoo.vespa.applicationmodel.ClusterId;
import com.yahoo.vespa.config.server.application.ClusterReindexing;
import com.yahoo.vespa.config.server.application.ClusterReindexingStatusClient;
import com.yahoo.vespa.config.server.modelfactory.ModelResult;
import com.yahoo.yolean.Exceptions;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.Timeout;

public class DefaultClusterReindexingStatusClient
implements ClusterReindexingStatusClient {
    private static final ObjectMapper mapper = new ObjectMapper();
    private final Executor executor = Executors.newSingleThreadExecutor((ThreadFactory)new DaemonThreadFactory("cluster-controller-reindexing-client-"));
    private final CloseableHttpAsyncClient httpClient = DefaultClusterReindexingStatusClient.createHttpClient();

    public DefaultClusterReindexingStatusClient() {
        this.httpClient.start();
    }

    @Override
    public Map<String, ClusterReindexing> getReindexingStatus(ModelResult application) throws IOException {
        Map<ClusterId, List<ServiceInfo>> clusters = DefaultClusterReindexingStatusClient.clusterControllerClusters(application);
        HashMap<ClusterId, CompletableFuture> futureStatusPerCluster = new HashMap<ClusterId, CompletableFuture>();
        clusters.forEach((clusterId, clusterNodes) -> {
            List parallelRequests = clusterNodes.stream().map(this::getReindexingStatus).collect(Collectors.toList());
            CompletableFuture combinedRequest = CompletableFutures.firstOf(parallelRequests);
            futureStatusPerCluster.put((ClusterId)clusterId, combinedRequest);
        });
        try {
            HashMap statusPerCluster = new HashMap();
            futureStatusPerCluster.forEach((clusterId, futureStatus) -> statusPerCluster.putAll((Map)futureStatus.join()));
            return Map.copyOf(statusPerCluster);
        }
        catch (Exception e) {
            throw new IOException("Failed to get reindexing status from cluster controllers: " + e.getMessage(), e);
        }
    }

    @Override
    public void close() {
        Exceptions.uncheck(() -> this.httpClient.close());
    }

    private CompletableFuture<Map<String, ClusterReindexing>> getReindexingStatus(ServiceInfo service) {
        URI uri = URI.create(String.format("http://%s:%d/reindexing/v1/status", service.getHostName(), DefaultClusterReindexingStatusClient.getStatePort(service)));
        final CompletableFuture responsePromise = new CompletableFuture();
        this.httpClient.execute(SimpleHttpRequests.get((URI)uri), (FutureCallback)new FutureCallback<SimpleHttpResponse>(){

            public void completed(SimpleHttpResponse result) {
                responsePromise.complete(result);
            }

            public void failed(Exception ex) {
                responsePromise.completeExceptionally(ex);
            }

            public void cancelled() {
                responsePromise.cancel(false);
            }
        });
        return responsePromise.handleAsync((response, error) -> {
            if (response != null) {
                return (Map)Exceptions.uncheck(() -> DefaultClusterReindexingStatusClient.toClusterReindexing(response));
            }
            throw Exceptions.throwUnchecked((Throwable)new IOException(String.format("For '%s': %s", uri, error.getMessage()), (Throwable)error));
        }, this.executor);
    }

    private static Map<String, ClusterReindexing> toClusterReindexing(SimpleHttpResponse response) throws IOException {
        if (response.getCode() != 200) {
            throw new IOException("Expected status code 200, got " + response.getCode());
        }
        if (response.getBody() == null) {
            throw new IOException("Response has no content");
        }
        return DefaultClusterReindexingStatusClient.toClusterReindexing(response.getBodyBytes());
    }

    private static Map<String, ClusterReindexing> toClusterReindexing(byte[] requestBody) throws IOException {
        JsonNode jsonNode = mapper.readTree(requestBody);
        HashMap<String, ClusterReindexing> clusters = new HashMap<String, ClusterReindexing>();
        Iterator clusterNames = jsonNode.get("clusters").fieldNames();
        while (clusterNames.hasNext()) {
            String clusterName = (String)clusterNames.next();
            JsonNode clusterJson = jsonNode.get("clusters").get(clusterName);
            HashMap<String, ClusterReindexing.Status> documentStatuses = new HashMap<String, ClusterReindexing.Status>();
            Iterator documentTypes = clusterJson.get("documentTypes").fieldNames();
            while (documentTypes.hasNext()) {
                String type = (String)documentTypes.next();
                JsonNode statusJson = clusterJson.get("documentTypes").get(type);
                Instant startedMillis = Instant.ofEpochMilli(statusJson.get("startedMillis").longValue());
                Instant endedMillis = Optional.ofNullable(statusJson.get("endedMillis")).map(json -> Instant.ofEpochMilli(json.longValue())).orElse(null);
                Double progress = Optional.ofNullable(statusJson.get("progress")).map(JsonNode::doubleValue).orElse(null);
                ClusterReindexing.State state = Optional.ofNullable(statusJson.get("state")).map(json -> ClusterReindexing.State.fromString(json.textValue())).orElse(null);
                String message = Optional.ofNullable(statusJson.get("message")).map(JsonNode::textValue).orElse(null);
                documentStatuses.put(type, new ClusterReindexing.Status(startedMillis, endedMillis, state, message, progress));
            }
            clusters.put(clusterName, new ClusterReindexing(documentStatuses));
        }
        return Map.copyOf(clusters);
    }

    private static int getStatePort(ServiceInfo service) {
        return service.getPorts().stream().filter(port -> port.getTags().contains("state")).map(PortInfo::getPort).findAny().orElseThrow(() -> new IllegalStateException("Cluster controller container has no container port"));
    }

    private static Map<ClusterId, List<ServiceInfo>> clusterControllerClusters(ModelResult application) {
        return application.getModel().getHosts().stream().flatMap(host -> host.getServices().stream()).filter(service -> service.getServiceType().equals(ContainerServiceType.CLUSTERCONTROLLER_CONTAINER.serviceName)).collect(Collectors.groupingBy(service -> new ClusterId((String)service.getProperty("clustername").get())));
    }

    private static CloseableHttpAsyncClient createHttpClient() {
        return VespaAsyncHttpClientBuilder.create().setIOReactorConfig(IOReactorConfig.custom().setSoTimeout(Timeout.ofSeconds((long)2L)).build()).setDefaultRequestConfig(RequestConfig.custom().setConnectTimeout(Timeout.ofSeconds((long)2L)).setConnectionRequestTimeout(Timeout.ofSeconds((long)2L)).setResponseTimeout(Timeout.ofSeconds((long)4L)).build()).setUserAgent("cluster-controller-reindexing-client").build();
    }
}

