/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.vertx.example.base;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.CommandClient;
import org.eclipse.hono.client.HonoClient;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.impl.HonoClientImpl;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.MessageTap;
import org.eclipse.hono.util.TimeUntilDisconnectNotification;
import org.eclipse.hono.vertx.example.base.HonoExampleConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HonoExampleApplicationBase {
    public static final Boolean USE_PLAIN_CONNECTION = Boolean.valueOf(System.getProperty("plain.connection", "false"));
    public static final String HONO_CLIENT_USER = System.getProperty("username", "consumer@HONO");
    public static final String HONO_CLIENT_PASSWORD = System.getProperty("password", "verysecret");
    protected final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000;
    public static final Boolean SEND_ONE_WAY_COMMANDS = Boolean.valueOf(System.getProperty("sendOneWayCommands", "false"));
    private final Vertx vertx = Vertx.vertx();
    private final HonoClient honoClient;
    private final Map<String, Handler<Void>> periodicCommandSenderTimerCancelerMap = new HashMap<String, Handler<Void>>();
    private final Map<String, TimeUntilDisconnectNotification> pendingTtdNotification = new HashMap<String, TimeUntilDisconnectNotification>();
    private static final Logger LOG = LoggerFactory.getLogger(HonoExampleApplicationBase.class);

    public HonoExampleApplicationBase() {
        ClientConfigProperties props = new ClientConfigProperties();
        props.setHost(HonoExampleConstants.HONO_AMQP_CONSUMER_HOST);
        props.setPort(HonoExampleConstants.HONO_AMQP_CONSUMER_PORT);
        if (!USE_PLAIN_CONNECTION.booleanValue()) {
            props.setUsername(HONO_CLIENT_USER);
            props.setPassword(HONO_CLIENT_PASSWORD);
            props.setTrustStorePath("target/config/hono-demo-certs-jar/trusted-certs.pem");
            props.setHostnameVerificationRequired(false);
        }
        this.honoClient = new HonoClientImpl(this.vertx, props);
    }

    protected void consumeData() throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        Future consumerFuture = Future.future();
        consumerFuture.setHandler(result -> {
            if (!result.succeeded()) {
                LOG.error("honoClient could not create downstream consumer for [{}:{}]", new Object[]{HonoExampleConstants.HONO_AMQP_CONSUMER_HOST, HonoExampleConstants.HONO_AMQP_CONSUMER_PORT, result.cause()});
            }
            latch.countDown();
        });
        this.honoClient.connect(this::onDisconnect).compose(connectedClient -> this.createConsumer()).setHandler(consumerFuture.completer());
        latch.await();
        if (consumerFuture.succeeded()) {
            System.in.read();
        }
        this.vertx.close();
    }

    private Future<MessageConsumer> createConsumer() {
        Consumer eventHandler = MessageTap.getConsumer(this::handleEventMessage, this::handleCommandReadinessNotification);
        Consumer telemetryHandler = MessageTap.getConsumer(this::handleTelemetryMessage, this::handleCommandReadinessNotification);
        return this.honoClient.createEventConsumer("DEFAULT_TENANT", eventHandler, closeHook -> LOG.error("remotely detached consumer link")).compose(messageConsumer -> this.honoClient.createTelemetryConsumer("DEFAULT_TENANT", telemetryHandler, closeHook -> LOG.error("remotely detached consumer link")).compose(telemetryMessageConsumer -> {
            LOG.info("Consumer ready for telemetry and event messages.");
            return Future.succeededFuture((Object)telemetryMessageConsumer);
        }).recover(t -> Future.failedFuture((Throwable)t)));
    }

    private void onDisconnect(ProtonConnection con) {
        this.vertx.setTimer(1000L, reconnect -> {
            LOG.info("attempting to re-connect to Hono ...");
            this.honoClient.connect(this::onDisconnect).compose(connectedClient -> this.createConsumer()).map(messageConsumer -> {
                LOG.info("Reconnected to Hono.");
                return null;
            });
        });
    }

    private void printMessage(String tenantId, Message msg, String messageType) {
        if (LOG.isDebugEnabled()) {
            String content = MessageHelper.getPayloadAsString((Message)msg);
            String deviceId = MessageHelper.getDeviceId((Message)msg);
            StringBuilder sb = new StringBuilder("received ").append(messageType).append(" [tenant: ").append(tenantId).append(", device: ").append(deviceId).append(", content-type: ").append(msg.getContentType()).append(" ]: [").append(content).append("].");
            LOG.debug(sb.toString());
        }
    }

    private void handleCommandReadinessNotification(TimeUntilDisconnectNotification notification) {
        if (notification.getTtd() <= 0) {
            this.handlePermanentlyConnectedCommandReadinessNotification(notification);
        } else {
            LOG.info("Device is ready to receive a command : [{}].", (Object)notification.toString());
            this.createCommandClientAndSendCommand(notification);
        }
    }

    private void handlePermanentlyConnectedCommandReadinessNotification(TimeUntilDisconnectNotification notification) {
        String keyForDevice = notification.getTenantAndDeviceId();
        Optional.ofNullable(this.pendingTtdNotification.get(keyForDevice)).map(previousNotification -> {
            if (notification.getCreationTime().isAfter(previousNotification.getCreationTime())) {
                LOG.info("Set new ttd value [{}] of notification for [{}]", (Object)notification.getTtd(), (Object)notification.getTenantAndDeviceId());
                this.pendingTtdNotification.put(keyForDevice, notification);
            } else {
                LOG.trace("Received notification for [{}] that was already superseded by newer [{}]", (Object)notification, previousNotification);
            }
            return false;
        }).orElseGet(() -> {
            this.pendingTtdNotification.put(keyForDevice, notification);
            this.vertx.setTimer(1000L, timerId -> {
                LOG.debug("Handle device notification for [{}].", (Object)notification.getTenantAndDeviceId());
                Optional.ofNullable(this.pendingTtdNotification.remove(keyForDevice)).map(notificationToHandle -> {
                    if (notificationToHandle.getTtd() == -1) {
                        LOG.info("Device notified as being ready to receive a command until further notice : [{}].", (Object)notificationToHandle.toString());
                        this.cancelPeriodicCommandSender(notification);
                        this.createCommandClientAndSendCommand((TimeUntilDisconnectNotification)notificationToHandle);
                        this.vertx.setPeriodic((long)(HonoExampleConstants.COMMAND_INTERVAL_FOR_DEVICES_CONNECTED_WITH_UNLIMITED_EXPIRY * 1000), id -> this.createCommandClientAndSendCommand((TimeUntilDisconnectNotification)notificationToHandle).map(commandClient -> {
                            this.setPeriodicCommandSenderTimerCanceler((Long)id, notification, (CommandClient)commandClient);
                            return null;
                        }));
                    } else {
                        LOG.info("Device notified as not being ready to receive a command (anymore) : [{}].", (Object)notification.toString());
                        this.cancelPeriodicCommandSender((TimeUntilDisconnectNotification)notificationToHandle);
                        LOG.debug("Device will not receive further commands : [{}].", (Object)notification.getTenantAndDeviceId());
                    }
                    return null;
                });
            });
            return true;
        });
    }

    private Future<CommandClient> createCommandClientAndSendCommand(TimeUntilDisconnectNotification notification) {
        return this.honoClient.getOrCreateCommandClient(notification.getTenantId(), notification.getDeviceId()).map(commandClient -> {
            commandClient.setRequestTimeout(this.calculateCommandTimeout(notification));
            if (SEND_ONE_WAY_COMMANDS.booleanValue()) {
                this.sendOneWayCommandToAdapter((CommandClient)commandClient, notification);
            } else {
                this.sendCommandToAdapter((CommandClient)commandClient, notification);
            }
            return commandClient;
        }).otherwise(t -> {
            LOG.error("Could not create command client", t);
            return null;
        });
    }

    private long calculateCommandTimeout(TimeUntilDisconnectNotification notification) {
        if (notification.getTtd() == -1) {
            return HonoExampleConstants.COMMAND_INTERVAL_FOR_DEVICES_CONNECTED_WITH_UNLIMITED_EXPIRY * 1000;
        }
        return notification.getMillisecondsUntilExpiry();
    }

    private void setPeriodicCommandSenderTimerCanceler(Long timerId, TimeUntilDisconnectNotification ttdNotification, CommandClient commandClient) {
        this.periodicCommandSenderTimerCancelerMap.put(ttdNotification.getTenantAndDeviceId(), (Handler<Void>)((Handler)v -> {
            this.closeCommandClient(commandClient, ttdNotification);
            this.vertx.cancelTimer(timerId.longValue());
            this.periodicCommandSenderTimerCancelerMap.remove(ttdNotification.getTenantAndDeviceId());
        }));
    }

    private boolean cancelPeriodicCommandSender(TimeUntilDisconnectNotification notification) {
        if (this.isPeriodicCommandSenderActiveForDevice(notification)) {
            LOG.debug("Cancelling periodic sender for {}", (Object)notification.getTenantAndDeviceId());
            this.periodicCommandSenderTimerCancelerMap.get(notification.getTenantAndDeviceId()).handle(null);
            return true;
        }
        LOG.debug("Wanted to cancel periodic sender for {}, but could not find one", (Object)notification.getTenantAndDeviceId());
        return false;
    }

    private boolean isPeriodicCommandSenderActiveForDevice(TimeUntilDisconnectNotification notification) {
        return this.periodicCommandSenderTimerCancelerMap.containsKey(notification.getTenantAndDeviceId());
    }

    private void sendCommandToAdapter(CommandClient commandClient, TimeUntilDisconnectNotification ttdNotification) {
        Buffer commandBuffer = this.buildCommandPayload();
        String command = "setBrightness";
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending command [{}] to [{}].", (Object)"setBrightness", (Object)ttdNotification.getTenantAndDeviceId());
        }
        commandClient.sendCommand("setBrightness", "application/json", commandBuffer, this.buildCommandProperties()).map(result -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully sent command payload: [{}].", (Object)commandBuffer.toString());
                LOG.debug("And received response: [{}].", (Object)Optional.ofNullable((Buffer)result.getPayload()).orElse(Buffer.buffer()).toString());
            }
            if (ttdNotification.getTtd() != -1) {
                this.closeCommandClient(commandClient, ttdNotification);
            }
            return result;
        }).otherwise(t -> {
            if (t instanceof ServiceInvocationException) {
                int errorCode = ((ServiceInvocationException)t).getErrorCode();
                LOG.debug("Command was replied with error code [{}].", (Object)errorCode);
            } else {
                LOG.debug("Could not send command : {}.", (Object)t.getMessage());
            }
            if (ttdNotification.getTtd() != -1) {
                this.closeCommandClient(commandClient, ttdNotification);
            }
            return null;
        });
    }

    private void sendOneWayCommandToAdapter(CommandClient commandClient, TimeUntilDisconnectNotification ttdNotification) {
        Buffer commandBuffer = this.buildOneWayCommandPayload();
        String command = "sendLifecycleInfo";
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending one-way command [{}] to [{}].", (Object)"sendLifecycleInfo", (Object)ttdNotification.getTenantAndDeviceId());
        }
        commandClient.sendOneWayCommand("sendLifecycleInfo", commandBuffer).map(statusResult -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully sent one-way command payload: [{}] and received status [{}].", (Object)commandBuffer.toString(), statusResult);
            }
            if (ttdNotification.getTtd() != -1) {
                this.closeCommandClient(commandClient, ttdNotification);
            }
            return statusResult;
        }).otherwise(t -> {
            if (t instanceof ServiceInvocationException) {
                int errorCode = ((ServiceInvocationException)t).getErrorCode();
                LOG.debug("One-way command was replied with error code [{}].", (Object)errorCode);
            } else {
                LOG.debug("Could not send one-way command : {}.", (Object)t.getMessage());
            }
            if (ttdNotification.getTtd() != -1) {
                this.closeCommandClient(commandClient, ttdNotification);
            }
            return null;
        });
    }

    private void closeCommandClient(CommandClient commandClient, TimeUntilDisconnectNotification ttdNotification) {
        commandClient.close(v -> {
            if (LOG.isDebugEnabled()) {
                LOG.trace("Closed commandClient for [{}].", (Object)ttdNotification.getTenantAndDeviceId());
            }
        });
    }

    private Map<String, Object> buildCommandProperties() {
        HashMap<String, Object> applicationProperties = new HashMap<String, Object>(1);
        applicationProperties.put("appId", "example#1");
        return applicationProperties;
    }

    private Buffer buildCommandPayload() {
        JsonObject jsonCmd = new JsonObject().put("brightness", Integer.valueOf((int)(Math.random() * 100.0)));
        return Buffer.buffer((String)jsonCmd.encodePrettily());
    }

    private Buffer buildOneWayCommandPayload() {
        JsonObject jsonCmd = new JsonObject().put("info", "app restarted.");
        return Buffer.buffer((String)jsonCmd.encodePrettily());
    }

    private void handleTelemetryMessage(Message msg) {
        this.printMessage("DEFAULT_TENANT", msg, "telemetry");
    }

    private void handleEventMessage(Message msg) {
        this.printMessage("DEFAULT_TENANT", msg, "event");
    }
}

