/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.application.client.kafka.impl;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.Objects;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageConsumer;
import org.eclipse.hono.application.client.kafka.KafkaMessageContext;
import org.eclipse.hono.application.client.kafka.impl.KafkaDownstreamMessage;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.consumer.AbstractAtLeastOnceKafkaConsumer;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerConfigProperties;

public class KafkaBasedDownstreamMessageConsumer
extends AbstractAtLeastOnceKafkaConsumer<DownstreamMessage<KafkaMessageContext>>
implements MessageConsumer {
    private KafkaBasedDownstreamMessageConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, String topic, Handler<DownstreamMessage<KafkaMessageContext>> messageHandler, Handler<Throwable> closeHandler, long pollTimeout) {
        super(kafkaConsumer, topic, messageHandler, closeHandler, pollTimeout);
    }

    public static Future<MessageConsumer> create(String tenantId, HonoTopic.Type type, KafkaConsumer<String, Buffer> kafkaConsumer, KafkaConsumerConfigProperties config, Handler<DownstreamMessage<KafkaMessageContext>> messageHandler, Handler<Throwable> closeHandler) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(type);
        Objects.requireNonNull(kafkaConsumer);
        Objects.requireNonNull(config);
        Objects.requireNonNull(messageHandler);
        Objects.requireNonNull(closeHandler);
        String topic = new HonoTopic(type, tenantId).toString();
        long pollTimeout = config.getPollTimeout();
        KafkaBasedDownstreamMessageConsumer consumer = new KafkaBasedDownstreamMessageConsumer(kafkaConsumer, topic, messageHandler, closeHandler, pollTimeout);
        return consumer.start().map((Object)consumer);
    }

    protected DownstreamMessage<KafkaMessageContext> createMessage(KafkaConsumerRecord<String, Buffer> record) {
        Objects.requireNonNull(record);
        return new KafkaDownstreamMessage(record);
    }

    public Future<Void> close() {
        return this.stop();
    }
}

