/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.kafka.admin.KafkaAdminClient;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaClientFactory {
    public static final Duration UNLIMITED_RETRIES_DURATION = Duration.ofSeconds(-1L);
    private static final Logger LOG = LoggerFactory.getLogger(KafkaClientFactory.class);
    private static final int CLIENT_CREATION_RETRY_DELAY_MILLIS = 100;
    private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*");
    private final Vertx vertx;
    private Clock clock = Clock.systemUTC();

    public KafkaClientFactory(Vertx vertx) {
        this.vertx = Objects.requireNonNull(vertx);
    }

    void setClock(Clock clock) {
        this.clock = Objects.requireNonNull(clock);
    }

    public <T> Future<T> createClientWithRetries(Supplier<T> clientSupplier, String bootstrapServersConfig, Duration retriesTimeout) {
        Objects.requireNonNull(clientSupplier);
        Promise resultPromise = Promise.promise();
        this.createClientWithRetries(clientSupplier, this.getRetriesTimeLimit(retriesTimeout), () -> KafkaClientFactory.containsValidServerEntries(bootstrapServersConfig), resultPromise);
        return resultPromise.future();
    }

    public <K, V> Future<KafkaConsumer<K, V>> createKafkaConsumerWithRetries(Map<String, String> consumerConfig, Class<K> keyType, Class<V> valueType, Duration retriesTimeout) {
        Objects.requireNonNull(consumerConfig);
        Objects.requireNonNull(keyType);
        Objects.requireNonNull(valueType);
        Promise resultPromise = Promise.promise();
        this.createClientWithRetries(() -> KafkaConsumer.create((Vertx)this.vertx, (Map)consumerConfig, (Class)keyType, (Class)valueType), this.getRetriesTimeLimit(retriesTimeout), () -> KafkaClientFactory.containsValidServerEntries((String)consumerConfig.get("bootstrap.servers")), resultPromise);
        return resultPromise.future();
    }

    public Future<KafkaAdminClient> createKafkaAdminClientWithRetries(Map<String, String> clientConfig, Duration retriesTimeout) {
        Objects.requireNonNull(clientConfig);
        Promise resultPromise = Promise.promise();
        this.createClientWithRetries(() -> KafkaAdminClient.create((Vertx)this.vertx, (Map)clientConfig), this.getRetriesTimeLimit(retriesTimeout), () -> KafkaClientFactory.containsValidServerEntries((String)clientConfig.get("bootstrap.servers")), resultPromise);
        return resultPromise.future();
    }

    private Instant getRetriesTimeLimit(Duration retriesTimeout) {
        if (retriesTimeout == null || retriesTimeout.isNegative()) {
            return Instant.MAX;
        }
        if (retriesTimeout.isZero()) {
            return Instant.MIN;
        }
        return Instant.now(this.clock).plus(retriesTimeout);
    }

    private <T> void createClientWithRetries(Supplier<T> clientSupplier, Instant retriesTimeLimit, Supplier<Boolean> serverEntriesValid, Promise<T> resultPromise) {
        try {
            resultPromise.complete(clientSupplier.get());
        }
        catch (Exception e) {
            if (!retriesTimeLimit.equals(Instant.MIN) && e instanceof KafkaException && KafkaClientFactory.isBootstrapServersConfigException(e.getCause()) && serverEntriesValid.get().booleanValue()) {
                if (Instant.now(this.clock).isBefore(retriesTimeLimit)) {
                    LOG.debug("error creating Kafka client, will retry in {}ms: {}", (Object)100, (Object)e.getCause().getMessage());
                    this.vertx.setTimer(100L, tid -> this.createClientWithRetries(clientSupplier, retriesTimeLimit, () -> true, resultPromise));
                } else {
                    LOG.warn("error creating Kafka client (no further attempts will be done, timeout for retries reached): {}", (Object)e.getCause().getMessage());
                    resultPromise.fail((Throwable)e);
                }
            }
            resultPromise.fail((Throwable)e);
        }
    }

    public static boolean isRetriableClientCreationError(Throwable exception, String bootstrapServersConfig) {
        return exception instanceof KafkaException && KafkaClientFactory.isBootstrapServersConfigException(exception.getCause()) && KafkaClientFactory.containsValidServerEntries(bootstrapServersConfig);
    }

    private static boolean isBootstrapServersConfigException(Throwable ex) {
        return ex instanceof ConfigException && ex.getMessage() != null && ex.getMessage().contains("bootstrap.servers");
    }

    private static boolean containsValidServerEntries(String bootstrapServersConfig) {
        List urlList = Optional.ofNullable(bootstrapServersConfig).map(serversString -> {
            String trimmed = serversString.trim();
            if (trimmed.isEmpty()) {
                return List.of();
            }
            return Arrays.asList(COMMA_WITH_WHITESPACE.split(trimmed, -1));
        }).orElseGet(List::of);
        return !urlList.isEmpty() && urlList.stream().allMatch(KafkaClientFactory::containsHostAndPort);
    }

    private static boolean containsHostAndPort(String url) {
        try {
            return Utils.getHost((String)url) != null && Utils.getPort((String)url) != null;
        }
        catch (IllegalArgumentException e) {
            return false;
        }
    }
}

