package com.azure.spring.integration.eventhub.impl;

import com.azure.messaging.eventhubs.models.CloseContext;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.InitializationContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.spring.integration.core.api.CheckpointConfig;
import com.azure.spring.integration.core.api.CheckpointMode;
import com.azure.spring.integration.core.api.reactor.AzureCheckpointer;
import com.azure.spring.integration.eventhub.checkpoint.CheckpointManager;
import com.azure.spring.integration.eventhub.converter.EventHubMessageConverter;
import java.util.HashMap;
import java.util.Objects;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

/* loaded from: input_file:com/azure/spring/integration/eventhub/impl/EventHubProcessor.class */
public class EventHubProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubProcessor.class);
    protected final Consumer<Message<?>> consumer;
    protected final Class<?> payloadType;
    protected final CheckpointConfig checkpointConfig;
    protected final EventHubMessageConverter messageConverter;
    protected final CheckpointManager checkpointManager;
    protected EventPosition eventPosition = EventPosition.latest();

    public EventHubProcessor(Consumer<Message<?>> consumer, Class<?> cls, CheckpointConfig checkpointConfig, EventHubMessageConverter eventHubMessageConverter) {
        this.consumer = consumer;
        this.payloadType = cls;
        this.checkpointConfig = checkpointConfig;
        this.messageConverter = eventHubMessageConverter;
        this.checkpointManager = CheckpointManager.of(checkpointConfig);
    }

    public void onInitialize(InitializationContext initializationContext) {
        LOGGER.info("Started receiving on partition: {}", initializationContext.getPartitionContext().getPartitionId());
    }

    public void onClose(CloseContext closeContext) {
        LOGGER.info("Stopped receiving on partition: {}. Reason: {}", closeContext.getPartitionContext().getPartitionId(), closeContext.getCloseReason());
    }

    public void onEvent(EventContext eventContext) {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        HashMap hashMap = new HashMap();
        hashMap.put("azure_raw_partition_id", partitionContext.getPartitionId());
        Object eventData = eventContext.getEventData();
        Objects.requireNonNull(eventContext);
        AzureCheckpointer azureCheckpointer = new AzureCheckpointer(eventContext::updateCheckpointAsync);
        if (this.checkpointConfig.getCheckpointMode() == CheckpointMode.MANUAL) {
            hashMap.put("azure_checkpointer", azureCheckpointer);
        }
        this.consumer.accept(this.messageConverter.toMessage(eventData, new MessageHeaders(hashMap), this.payloadType));
        this.checkpointManager.onMessage(eventContext, eventContext.getEventData());
        if (this.checkpointConfig.getCheckpointMode() == CheckpointMode.BATCH) {
            this.checkpointManager.completeBatch(eventContext);
        }
    }

    public void onError(ErrorContext errorContext) {
        LOGGER.error("Error occurred on partition: {}. Error: {}", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable());
    }

    public void setEventPosition(EventPosition eventPosition) {
        this.eventPosition = eventPosition;
    }
}
