package com.azure.spring.integration.eventhub.factory;

import com.azure.core.http.policy.HttpLogOptions;
import com.azure.core.util.ClientOptions;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.spring.cloud.context.core.util.Constants;
import com.azure.spring.cloud.context.core.util.Memoizer;
import com.azure.spring.cloud.context.core.util.Tuple;
import com.azure.spring.integration.eventhub.api.EventHubClientFactory;
import com.azure.spring.integration.eventhub.impl.EventHubProcessor;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;

/* loaded from: input_file:com/azure/spring/integration/eventhub/factory/DefaultEventHubClientFactory.class */
public class DefaultEventHubClientFactory implements EventHubClientFactory, DisposableBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventHubClientFactory.class);
    private final String checkpointStorageConnectionString;
    private final String checkpointStorageContainer;
    private final String eventHubConnectionString;
    private final Map<Tuple<String, String>, EventHubConsumerAsyncClient> consumerClientMap = new ConcurrentHashMap();
    private final Map<String, EventHubProducerAsyncClient> producerClientMap = new ConcurrentHashMap();
    private final Map<Tuple<String, String>, EventProcessorClient> processorClientMap = new ConcurrentHashMap();
    private final BiFunction<String, String, EventHubConsumerAsyncClient> eventHubConsumerClientCreator = Memoizer.memoize(this.consumerClientMap, this::createEventHubClient);
    private final Function<String, EventHubProducerAsyncClient> producerClientCreator = Memoizer.memoize(this.producerClientMap, this::createProducerClient);

    public DefaultEventHubClientFactory(@NonNull String str, String str2, String str3) {
        Assert.hasText(str2, "checkpointConnectionString can't be null or empty");
        this.eventHubConnectionString = str;
        this.checkpointStorageConnectionString = str2;
        this.checkpointStorageContainer = str3;
    }

    private EventHubConsumerAsyncClient createEventHubClient(String str, String str2) {
        return new EventHubClientBuilder().connectionString(this.eventHubConnectionString, str).consumerGroup(str2).clientOptions(new ClientOptions().setApplicationId(Constants.SPRING_EVENT_HUB_APPLICATION_ID)).buildAsyncConsumerClient();
    }

    private EventHubProducerAsyncClient createProducerClient(String str) {
        return new EventHubClientBuilder().connectionString(this.eventHubConnectionString, str).clientOptions(new ClientOptions().setApplicationId(Constants.SPRING_EVENT_HUB_APPLICATION_ID)).buildAsyncProducerClient();
    }

    private EventProcessorClient createEventProcessorClientInternal(String str, String str2, EventHubProcessor eventHubProcessor) {
        String str3 = this.checkpointStorageContainer == null ? str : this.checkpointStorageContainer;
        BlobContainerAsyncClient buildAsyncClient = new BlobContainerClientBuilder().connectionString(this.checkpointStorageConnectionString).containerName(str3).httpLogOptions(new HttpLogOptions().setApplicationId(Constants.SPRING_EVENT_HUB_APPLICATION_ID)).buildAsyncClient();
        Boolean bool = (Boolean) buildAsyncClient.exists().block();
        if (bool == null || !bool.booleanValue()) {
            LOGGER.warn("Will create storage blob {}, the auto creation might be deprecated in later versions.", str3);
            buildAsyncClient.create().block(Duration.ofMinutes(5L));
        }
        EventProcessorClientBuilder checkpointStore = new EventProcessorClientBuilder().connectionString(this.eventHubConnectionString, str).consumerGroup(str2).checkpointStore(new BlobCheckpointStore(buildAsyncClient));
        Objects.requireNonNull(eventHubProcessor);
        EventProcessorClientBuilder processPartitionInitialization = checkpointStore.processPartitionInitialization(eventHubProcessor::onInitialize);
        Objects.requireNonNull(eventHubProcessor);
        EventProcessorClientBuilder processPartitionClose = processPartitionInitialization.processPartitionClose(eventHubProcessor::onClose);
        Objects.requireNonNull(eventHubProcessor);
        EventProcessorClientBuilder processEvent = processPartitionClose.processEvent(eventHubProcessor::onEvent);
        Objects.requireNonNull(eventHubProcessor);
        return processEvent.processError(eventHubProcessor::onError).buildEventProcessorClient();
    }

    private <K, V> void close(Map<K, V> map, Consumer<V> consumer) {
        map.values().forEach(obj -> {
            try {
                consumer.accept(obj);
            } catch (Exception e) {
                LOGGER.warn("Failed to clean event hub client factory", e);
            }
        });
    }

    public void destroy() {
        close(this.consumerClientMap, (v0) -> {
            v0.close();
        });
        close(this.producerClientMap, (v0) -> {
            v0.close();
        });
        close(this.processorClientMap, (v0) -> {
            v0.stop();
        });
    }

    @Override // com.azure.spring.integration.eventhub.api.EventHubClientFactory
    public EventHubConsumerAsyncClient getOrCreateConsumerClient(String str, String str2) {
        return this.eventHubConsumerClientCreator.apply(str, str2);
    }

    @Override // com.azure.spring.integration.eventhub.api.EventHubClientFactory
    public EventHubProducerAsyncClient getOrCreateProducerClient(String str) {
        return this.producerClientCreator.apply(str);
    }

    @Override // com.azure.spring.integration.eventhub.api.EventHubClientFactory
    public EventProcessorClient createEventProcessorClient(String str, String str2, EventHubProcessor eventHubProcessor) {
        return this.processorClientMap.computeIfAbsent(Tuple.of(str, str2), tuple -> {
            return createEventProcessorClientInternal(str, str2, eventHubProcessor);
        });
    }

    @Override // com.azure.spring.integration.eventhub.api.EventHubClientFactory
    public Optional<EventProcessorClient> getEventProcessorClient(String str, String str2) {
        return Optional.ofNullable(this.processorClientMap.get(Tuple.of(str, str2)));
    }

    @Override // com.azure.spring.integration.eventhub.api.EventHubClientFactory
    public EventProcessorClient removeEventProcessorClient(String str, String str2) {
        return this.processorClientMap.remove(Tuple.of(str, str2));
    }
}
