/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.device.amqp.impl;

import io.opentracing.SpanContext;
import io.vertx.core.Future;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import java.util.Objects;
import java.util.function.BiConsumer;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.amqp.AbstractHonoClient;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.command.CommandConsumer;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpAdapterClientCommandConsumer
extends AbstractHonoClient
implements CommandConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpAdapterClientCommandConsumer.class);

    private AmqpAdapterClientCommandConsumer(HonoConnection connection, ProtonReceiver receiver) {
        super(connection);
        this.receiver = Objects.requireNonNull(receiver);
    }

    public static Future<CommandConsumer> create(HonoConnection con, String tenantId, String deviceId, BiConsumer<ProtonDelivery, Message> messageHandler) {
        Objects.requireNonNull(con);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(messageHandler);
        ResourceIdentifier address = ResourceIdentifier.fromPath((String[])new String[]{"command", tenantId, deviceId});
        return AmqpAdapterClientCommandConsumer.createCommandConsumer(con, messageHandler, address);
    }

    public static Future<CommandConsumer> create(HonoConnection con, BiConsumer<ProtonDelivery, Message> messageHandler) {
        Objects.requireNonNull(con);
        Objects.requireNonNull(messageHandler);
        ResourceIdentifier address = ResourceIdentifier.fromPath((String[])new String[]{"command"});
        return AmqpAdapterClientCommandConsumer.createCommandConsumer(con, messageHandler, address);
    }

    private static Future<CommandConsumer> createCommandConsumer(HonoConnection con, BiConsumer<ProtonDelivery, Message> messageHandler, ResourceIdentifier address) {
        return con.isConnected(con.getConfig().getLinkEstablishmentTimeout()).compose(v -> AmqpAdapterClientCommandConsumer.createReceiver(con, messageHandler, address)).map(rec -> {
            AmqpAdapterClientCommandConsumer consumer = new AmqpAdapterClientCommandConsumer(con, (ProtonReceiver)rec);
            con.addReconnectListener(c -> AmqpAdapterClientCommandConsumer.createReceiver(con, messageHandler, address).onSuccess(consumer::setReceiver));
            return consumer;
        });
    }

    private static Future<ProtonReceiver> createReceiver(HonoConnection con, BiConsumer<ProtonDelivery, Message> messageHandler, ResourceIdentifier address) {
        return con.createReceiver(address.toString(), ProtonQoS.AT_LEAST_ONCE, messageHandler::accept, remote -> LOG.info("The remote [{}] closed the receiver link", remote));
    }

    private void setReceiver(ProtonReceiver protonReceiver) {
        this.receiver = protonReceiver;
    }

    ProtonReceiver getReceiver() {
        return this.receiver;
    }

    public final Future<Void> close(SpanContext spanContext) {
        return this.closeLinks();
    }
}

