package com.mulesoft.connectors.azure.eventhubs.internal.client;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.mulesoft.connectors.azure.eventhubs.internal.error.exception.ConnectivityException;
import com.mulesoft.connectors.azure.eventhubs.internal.source.eventhandler.EventHandler;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/connectors/azure/eventhubs/internal/client/DefaultEventHubConsumerClient.class */
public class DefaultEventHubConsumerClient implements EventHubConsumerClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventHubConsumerClient.class);
    private final int checkpointFrequency;
    private final EventHandler eventHandler;
    private final EventProcessorClientBuilder builder;
    private EventProcessorClient client;

    public DefaultEventHubConsumerClient(EventProcessorClientBuilder eventProcessorClientBuilder, int i, EventHandler eventHandler) {
        this.builder = eventProcessorClientBuilder;
        this.checkpointFrequency = i;
        this.eventHandler = eventHandler;
    }

    public int getCheckpointFrequency() {
        return this.checkpointFrequency;
    }

    public EventHandler getEventHandler() {
        return this.eventHandler;
    }

    public EventProcessorClientBuilder getBuilder() {
        return this.builder;
    }

    public EventProcessorClient getClient() {
        return this.client;
    }

    public void setClient(EventProcessorClient eventProcessorClient) {
        this.client = eventProcessorClient;
    }

    @Override // com.mulesoft.connectors.azure.eventhubs.internal.client.EventHubConsumerClient
    public void consume() {
        this.client = this.builder.processEvent(this::eventContextHandler).processError(this::errorContextHandler).buildEventProcessorClient();
        this.client.start();
    }

    @Override // com.mulesoft.connectors.azure.eventhubs.internal.client.EventHubConsumerClient
    public void close() {
        this.client.stop();
    }

    private void eventContextHandler(EventContext eventContext) {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
        LOGGER.info("Starting Event Processor in partition {} and Sequence Number {}", partitionContext.getPartitionId(), eventData.getSequenceNumber());
        if (eventData.getSequenceNumber().longValue() % this.checkpointFrequency == 0) {
            LOGGER.info("Saving Checkpoint in partition {}, Sequence Number {} and Consumer Group {}", new Object[]{partitionContext.getPartitionId(), eventData.getSequenceNumber(), partitionContext.getConsumerGroup()});
            eventContext.updateCheckpointAsync().doOnSuccess(r3 -> {
                LOGGER.info("Checkpoint saved successfully");
            }).doOnError(th -> {
                LOGGER.error("Checkpoint not saved due an error: " + th.getMessage());
            }).subscribe();
        }
        this.eventHandler.handle(eventData);
    }

    private void errorContextHandler(ErrorContext errorContext) {
        Throwable throwable = errorContext.getThrowable();
        if (throwable.getClass().isAssignableFrom(TimeoutException.class)) {
            throw new ConnectivityException("Connectivity issue by a Timeout Exception", throwable);
        }
        LOGGER.error("Error occurred in Event Processor for partition {}, {}", new Object[]{errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable().getMessage(), errorContext.getThrowable()});
    }
}
