/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.vertx.example.base;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.CommandClient;
import org.eclipse.hono.client.HonoClient;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.impl.HonoClientImpl;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.MessageTap;
import org.eclipse.hono.util.TimeUntilDisconnectNotification;
import org.eclipse.hono.vertx.example.base.HonoExampleConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HonoConsumerBase {
    public static final Boolean USE_PLAIN_CONNECTION = Boolean.valueOf(System.getProperty("plain.connection", "false"));
    public static final String HONO_CLIENT_USER = System.getProperty("username", "consumer@HONO");
    public static final String HONO_CLIENT_PASSWORD = System.getProperty("password", "verysecret");
    protected final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000;
    private final Vertx vertx = Vertx.vertx();
    private final HonoClient honoClient;
    private final Map<String, Handler<Void>> periodicCommandSenderTimerCancelerMap = new HashMap<String, Handler<Void>>();
    private static final Logger LOG = LoggerFactory.getLogger(HonoConsumerBase.class);

    public HonoConsumerBase() {
        ClientConfigProperties props = new ClientConfigProperties();
        props.setHost(HonoExampleConstants.HONO_AMQP_CONSUMER_HOST);
        props.setPort(HonoExampleConstants.HONO_AMQP_CONSUMER_PORT);
        if (!USE_PLAIN_CONNECTION.booleanValue()) {
            props.setUsername(HONO_CLIENT_USER);
            props.setPassword(HONO_CLIENT_PASSWORD);
            props.setTrustStorePath("target/config/hono-demo-certs-jar/trusted-certs.pem");
            props.setHostnameVerificationRequired(false);
        }
        this.honoClient = new HonoClientImpl(this.vertx, props);
    }

    protected void consumeData() throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        Future consumerFuture = Future.future();
        consumerFuture.setHandler(result -> {
            if (!result.succeeded()) {
                LOG.error("honoClient could not create downstream consumer for [{}:{}]", new Object[]{HonoExampleConstants.HONO_AMQP_CONSUMER_HOST, HonoExampleConstants.HONO_AMQP_CONSUMER_PORT, result.cause()});
            }
            latch.countDown();
        });
        this.honoClient.connect(this::onDisconnect).compose(connectedClient -> this.createConsumer()).setHandler(consumerFuture.completer());
        latch.await();
        if (consumerFuture.succeeded()) {
            System.in.read();
        }
        this.vertx.close();
    }

    private Future<MessageConsumer> createConsumer() {
        Consumer eventHandler = MessageTap.getConsumer(this::handleEventMessage, this::handleCommandReadinessNotification);
        Consumer telemetryHandler = MessageTap.getConsumer(this::handleTelemetryMessage, this::handleCommandReadinessNotification);
        return this.honoClient.createEventConsumer("DEFAULT_TENANT", eventHandler, closeHook -> LOG.error("remotely detached consumer link")).compose(messageConsumer -> this.honoClient.createTelemetryConsumer("DEFAULT_TENANT", telemetryHandler, closeHook -> LOG.error("remotely detached consumer link")).compose(telemetryMessageConsumer -> {
            LOG.info("Consumer ready for telemetry and event messages.");
            return Future.succeededFuture((Object)telemetryMessageConsumer);
        }).recover(t -> Future.failedFuture((Throwable)t)));
    }

    private void onDisconnect(ProtonConnection con) {
        this.vertx.setTimer(1000L, reconnect -> {
            LOG.info("attempting to re-connect to Hono ...");
            this.honoClient.connect(this::onDisconnect).compose(connectedClient -> this.createConsumer()).map(messageConsumer -> {
                LOG.info("Reconnected to Hono.");
                return null;
            });
        });
    }

    private void printMessage(String tenantId, Message msg, String messageType) {
        if (LOG.isDebugEnabled()) {
            Data body = (Data)msg.getBody();
            String content = body != null ? body.getValue().toString() : "";
            String deviceId = MessageHelper.getDeviceId((Message)msg);
            StringBuilder sb = new StringBuilder("received ").append(messageType).append(" [tenant: ").append(tenantId).append(", device: ").append(deviceId).append(", content-type: ").append(msg.getContentType()).append(" ]: [").append(content).append("].");
            LOG.debug(sb.toString());
        }
    }

    private void handleCommandReadinessNotification(TimeUntilDisconnectNotification notification) {
        if (notification.getMillisecondsUntilExpiry() == 0L) {
            LOG.info("Device notified as not being ready to receive a command (anymore) : [{}].", (Object)notification.toString());
            this.cancelPeriodicCommandSender(notification);
        } else {
            LOG.info("Device is ready to receive a command : [{}].", (Object)notification.toString());
            this.createCommandClientAndSendCommand(notification);
        }
    }

    private void createCommandClientAndSendCommand(TimeUntilDisconnectNotification notification) {
        this.honoClient.getOrCreateCommandClient(notification.getTenantId(), notification.getDeviceId()).map(commandClient -> {
            commandClient.setRequestTimeout(this.calculateCommandTimeout(notification));
            if (notification.getTtd() == -1) {
                this.cancelPeriodicCommandSender(notification);
                this.sendCommandToAdapter((CommandClient)commandClient, notification);
                long timerId = this.vertx.setPeriodic((long)(HonoExampleConstants.COMMAND_INTERVAL_FOR_DEVICES_CONNECTED_WITH_UNLIMITED_EXPIRY * 1000), id -> this.sendCommandToAdapter((CommandClient)commandClient, notification));
                this.setPeriodicCommandSenderTimerCanceler(timerId, notification, (CommandClient)commandClient);
            } else {
                this.sendCommandToAdapter((CommandClient)commandClient, notification);
            }
            return commandClient;
        }).otherwise(t -> {
            LOG.error("Could not create command client", t);
            return null;
        });
    }

    private long calculateCommandTimeout(TimeUntilDisconnectNotification notification) {
        if (notification.getTtd() == -1) {
            return HonoExampleConstants.COMMAND_INTERVAL_FOR_DEVICES_CONNECTED_WITH_UNLIMITED_EXPIRY * 1000;
        }
        return notification.getMillisecondsUntilExpiry();
    }

    private void setPeriodicCommandSenderTimerCanceler(Long timerId, TimeUntilDisconnectNotification notification, CommandClient commandClient) {
        this.periodicCommandSenderTimerCancelerMap.put(notification.getTenantAndDeviceId(), (Handler<Void>)((Handler)v -> {
            commandClient.close(ignore -> {
                if (LOG.isDebugEnabled()) {
                    LOG.trace("Closed commandClient for [{}].", (Object)notification.getTenantAndDeviceId());
                }
            });
            this.vertx.cancelTimer(timerId.longValue());
            this.periodicCommandSenderTimerCancelerMap.remove(notification.getTenantAndDeviceId());
        }));
    }

    private void cancelPeriodicCommandSender(TimeUntilDisconnectNotification notification) {
        if (this.isDeviceConnectedForCommands(notification)) {
            this.periodicCommandSenderTimerCancelerMap.get(notification.getTenantAndDeviceId()).handle(null);
        }
    }

    private boolean isDeviceConnectedForCommands(TimeUntilDisconnectNotification notification) {
        return this.periodicCommandSenderTimerCancelerMap.containsKey(notification.getTenantAndDeviceId());
    }

    private void sendCommandToAdapter(CommandClient commandClient, TimeUntilDisconnectNotification notification) {
        Buffer commandBuffer = this.buildCommandPayload();
        String command = "setBrightness";
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending command [{}] to [{}].", (Object)"setBrightness", (Object)notification.getTenantAndDeviceId());
        }
        commandClient.sendCommand("setBrightness", commandBuffer).map(result -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully sent command payload: [{}].", (Object)commandBuffer.toString());
                LOG.debug("And received response: [{}].", (Object)((Buffer)Optional.ofNullable(result.getPayload()).orElse(Buffer.buffer())).toString());
            }
            if (notification.getTtd() != -1) {
                commandClient.close(v -> {
                    if (LOG.isDebugEnabled()) {
                        LOG.trace("Closed commandClient for [{}].", (Object)notification.getTenantAndDeviceId());
                    }
                });
            } else {
                this.cancelPeriodicCommandSender(notification);
                this.createCommandClientAndSendCommand(notification);
            }
            return result;
        }).otherwise(t -> {
            if (t instanceof ServiceInvocationException) {
                int errorCode = ((ServiceInvocationException)t).getErrorCode();
                LOG.debug("Command was replied with error code [{}].", (Object)errorCode);
            } else {
                LOG.debug("Could not send command : {}.", (Object)t.getMessage());
            }
            if (notification.getTtd() != -1) {
                commandClient.close(v -> {
                    if (LOG.isDebugEnabled()) {
                        LOG.trace("Closed commandClient for [{}].", (Object)notification.getTenantAndDeviceId());
                    }
                });
            }
            return null;
        });
    }

    private Buffer buildCommandPayload() {
        JsonObject jsonCmd = new JsonObject().put("brightness", Integer.valueOf((int)(Math.random() * 100.0)));
        return Buffer.buffer((String)jsonCmd.encodePrettily());
    }

    private void handleTelemetryMessage(Message msg) {
        this.printMessage("DEFAULT_TENANT", msg, "telemetry");
    }

    private void handleEventMessage(Message msg) {
        this.printMessage("DEFAULT_TENANT", msg, "event");
    }
}

