/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.boot.actuate.health.StatusAggregator;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealth;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

public class KafkaBinderHealthIndicator
implements KafkaBinderHealth,
DisposableBean {
    private static final int DEFAULT_TIMEOUT = 60;
    private final ExecutorService executor = Executors.newSingleThreadExecutor((ThreadFactory)new CustomizableThreadFactory("kafka-binder-health-"));
    private final KafkaMessageChannelBinder binder;
    private final ConsumerFactory<?, ?> consumerFactory;
    private int timeout = 60;
    private Consumer<?, ?> metadataConsumer;
    private boolean considerDownWhenAnyPartitionHasNoLeader;

    public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder, ConsumerFactory<?, ?> consumerFactory) {
        this.binder = binder;
        this.consumerFactory = consumerFactory;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public void setConsiderDownWhenAnyPartitionHasNoLeader(boolean considerDownWhenAnyPartitionHasNoLeader) {
        this.considerDownWhenAnyPartitionHasNoLeader = considerDownWhenAnyPartitionHasNoLeader;
    }

    public Health health() {
        Health topicsHealth = this.safelyBuildTopicsHealth();
        Health listenerContainersHealth = this.buildListenerContainersHealth();
        return this.merge(topicsHealth, listenerContainersHealth);
    }

    private Health merge(Health topicsHealth, Health listenerContainersHealth) {
        Status aggregatedStatus = StatusAggregator.getDefault().getAggregateStatus(new Status[]{topicsHealth.getStatus(), listenerContainersHealth.getStatus()});
        HashMap aggregatedDetails = new HashMap();
        aggregatedDetails.putAll(topicsHealth.getDetails());
        aggregatedDetails.putAll(listenerContainersHealth.getDetails());
        return Health.status((Status)aggregatedStatus).withDetails(aggregatedDetails).build();
    }

    private Health safelyBuildTopicsHealth() {
        Future<Health> future = this.executor.submit(this::buildTopicsHealth);
        try {
            return future.get(this.timeout, TimeUnit.SECONDS);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            return Health.down().withDetail("Interrupted while waiting for partition information in", (Object)(this.timeout + " seconds")).build();
        }
        catch (ExecutionException ex) {
            return Health.down((Exception)ex).build();
        }
        catch (TimeoutException ex) {
            return Health.down().withDetail("Failed to retrieve partition information in", (Object)(this.timeout + " seconds")).build();
        }
    }

    private void initMetadataConsumer() {
        if (this.metadataConsumer == null) {
            this.metadataConsumer = this.consumerFactory.createConsumer();
        }
    }

    private Health buildTopicsHealth() {
        try {
            this.initMetadataConsumer();
            HashSet<String> downMessages = new HashSet<String>();
            HashSet<String> checkedTopics = new HashSet<String>();
            Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = this.binder.getTopicsInUse();
            if (topicsInUse.isEmpty()) {
                try {
                    this.metadataConsumer.listTopics(Duration.ofSeconds(this.timeout));
                }
                catch (Exception e) {
                    return Health.down().withDetail("No topic information available", (Object)"Kafka broker is not reachable").build();
                }
                return Health.unknown().withDetail("No bindings found", (Object)"Kafka binder may not be bound to destinations on the broker").build();
            }
            for (String topic : topicsInUse.keySet()) {
                KafkaMessageChannelBinder.TopicInformation topicInformation = topicsInUse.get(topic);
                if (!topicInformation.isTopicPattern()) {
                    List partitionInfos = this.metadataConsumer.partitionsFor(topic);
                    for (PartitionInfo partitionInfo : partitionInfos) {
                        if (topicInformation.getPartitionInfos().contains(partitionInfo) && partitionInfo.leader() == null || partitionInfo.leader() != null && partitionInfo.leader().id() == -1) {
                            downMessages.add(partitionInfo.toString());
                            continue;
                        }
                        if ((!this.considerDownWhenAnyPartitionHasNoLeader || partitionInfo.leader() != null) && (partitionInfo.leader() == null || partitionInfo.leader().id() != -1)) continue;
                        downMessages.add(partitionInfo.toString());
                    }
                    checkedTopics.add(topic);
                    continue;
                }
                try {
                    this.metadataConsumer.listTopics(Duration.ofSeconds(this.timeout));
                }
                catch (Exception ex) {
                    return Health.down().withDetail("Cluster not connected", (Object)"Destination provided is a pattern, but cannot connect to the cluster for any verification").build();
                }
            }
            if (downMessages.isEmpty()) {
                return Health.up().withDetail("topicsInUse", checkedTopics).build();
            }
            return Health.down().withDetail("Following partitions in use have no leaders: ", (Object)((Object)downMessages).toString()).build();
        }
        catch (Exception ex) {
            return Health.down((Exception)ex).build();
        }
    }

    private Health buildListenerContainersHealth() {
        List<AbstractMessageListenerContainer<?, ?>> listenerContainers = this.binder.getKafkaMessageListenerContainers();
        if (listenerContainers.isEmpty()) {
            return Health.unknown().build();
        }
        Status status = Status.UP;
        ArrayList containersDetails = new ArrayList();
        for (AbstractMessageListenerContainer<?, ?> container : listenerContainers) {
            HashMap<String, Object> containerDetails = new HashMap<String, Object>();
            boolean isRunning = container.isRunning();
            boolean isOk = container.isInExpectedState();
            if (!isOk) {
                status = Status.DOWN;
            }
            containerDetails.put("isRunning", isRunning);
            containerDetails.put("isStoppedAbnormally", !isRunning && !isOk);
            containerDetails.put("isPaused", container.isContainerPaused());
            containerDetails.put("listenerId", container.getListenerId());
            containerDetails.put("groupId", container.getGroupId());
            containersDetails.add(containerDetails);
        }
        return Health.status((Status)status).withDetail("listenerContainers", containersDetails).build();
    }

    public void destroy() {
        this.executor.shutdown();
    }
}

