/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.dapr.consumer;

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.SubscriptionListener;
import io.dapr.client.domain.CloudEvent;
import io.dapr.utils.TypeRef;
import java.io.Closeable;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.dapr.DaprConfiguration;
import org.apache.camel.component.dapr.DaprEndpoint;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class DaprPubSubConsumer
extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(DaprPubSubConsumer.class);
    private final String pubSubName;
    private final String topic;
    private DaprPreviewClient client;
    private Closeable subscription;

    public DaprPubSubConsumer(DaprEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
        this.pubSubName = endpoint.getConfiguration().getPubSubName();
        this.topic = endpoint.getConfiguration().getTopic();
        this.client = endpoint.getConfiguration().getPreviewClient();
    }

    protected void doInit() throws Exception {
        super.doInit();
        if (this.client == null && (ObjectHelper.isEmpty((String)this.pubSubName) || ObjectHelper.isEmpty((String)this.topic))) {
            throw new IllegalArgumentException("pubSubName and topic are mandatory for subscribe operation");
        }
    }

    protected void doStart() throws Exception {
        super.doStart();
        LOG.debug("Creating connection to Dapr PubSub");
        if (this.client == null) {
            this.client = new DaprClientBuilder().buildPreviewClient();
        }
        this.subscription = this.client.subscribeToEvents(this.pubSubName, this.topic, (SubscriptionListener)new DaprSubscriptionListener(), TypeRef.get(byte[].class));
    }

    protected void doStop() throws Exception {
        if (this.subscription != null) {
            this.subscription.close();
        }
        if (this.client != null) {
            this.client.close();
        }
        super.doStop();
    }

    public DaprConfiguration getConfiguration() {
        return this.getEndpoint().getConfiguration();
    }

    public DaprEndpoint getEndpoint() {
        return (DaprEndpoint)super.getEndpoint();
    }

    private Exchange createServiceBusExchange(CloudEvent<byte[]> cloudEvent) {
        Exchange exchange = this.createExchange(true);
        Message message = exchange.getIn();
        message.setBody(cloudEvent.getData());
        message.setHeader("CamelDaprPubSubName", (Object)cloudEvent.getPubsubName());
        message.setHeader("CamelDaprTopic", (Object)cloudEvent.getTopic());
        message.setHeader("CamelDaprID", (Object)cloudEvent.getId());
        message.setHeader("CamelDaprSource", (Object)cloudEvent.getSource());
        message.setHeader("CamelDaprType", (Object)cloudEvent.getType());
        message.setHeader("CamelDaprSpecificVersion", (Object)cloudEvent.getSpecversion());
        message.setHeader("CamelDaprDataContentType", (Object)cloudEvent.getDatacontenttype());
        message.setHeader("CamelDaprBinaryData", (Object)cloudEvent.getBinaryData());
        message.setHeader("CamelDaprTime", (Object)cloudEvent.getTime());
        message.setHeader("CamelDaprTraceParent", (Object)cloudEvent.getTraceParent());
        message.setHeader("CamelDaprTraceState", (Object)cloudEvent.getTraceState());
        return exchange;
    }

    protected class DaprSubscriptionListener
    implements SubscriptionListener<byte[]> {
        protected DaprSubscriptionListener() {
        }

        public Mono<SubscriptionListener.Status> onEvent(CloudEvent<byte[]> cloudEvent) {
            Exchange exchange = DaprPubSubConsumer.this.createServiceBusExchange(cloudEvent);
            AsyncCallback cb = DaprPubSubConsumer.this.defaultConsumerCallback(exchange, true);
            DaprPubSubConsumer.this.getAsyncProcessor().process(exchange, cb);
            return Mono.just((Object)SubscriptionListener.Status.SUCCESS);
        }

        public void onError(RuntimeException ex) {
            LOG.error("Error from Dapr client: {}", (Object)ex.getMessage(), (Object)ex);
        }
    }
}

