/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.vertx.example.base;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonClientOptions;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.hono.client.HonoClient;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.RegistrationClient;
import org.eclipse.hono.client.impl.HonoClientImpl;
import org.eclipse.hono.config.ClientConfigProperties;

public class HonoSenderBase {
    public static final int COUNT = 1000;
    public static final String HONO_CLIENT_USER = "hono-client@HONO";
    public static final String HONO_CLIENT_PASSWORD = "secret";
    private final Vertx vertx = Vertx.vertx();
    private final HonoClient honoRegistryClient;
    private final HonoClient honoMessagingClient;
    private RegistrationClient registrationClient;
    private MessageSender messageSender;
    private boolean eventMode = false;
    private CountDownLatch messageDeliveryCountDown = new CountDownLatch(1000);
    private AtomicInteger nrMessageDeliveryFailed = new AtomicInteger(0);
    private AtomicInteger nrMessageDeliverySucceeded = new AtomicInteger(0);

    public HonoSenderBase() {
        ClientConfigProperties messagingProps = new ClientConfigProperties();
        messagingProps.setHost("127.0.0.1");
        messagingProps.setPort(5671);
        messagingProps.setUsername(HONO_CLIENT_USER);
        messagingProps.setPassword(HONO_CLIENT_PASSWORD);
        messagingProps.setTrustStorePath("target/config/hono-demo-certs-jar/trusted-certs.pem");
        messagingProps.setHostnameVerificationRequired(false);
        this.honoMessagingClient = new HonoClientImpl(this.vertx, messagingProps);
        ClientConfigProperties registryProps = new ClientConfigProperties();
        registryProps.setHost("127.0.0.1");
        registryProps.setPort(25671);
        registryProps.setUsername(HONO_CLIENT_USER);
        registryProps.setPassword(HONO_CLIENT_PASSWORD);
        registryProps.setTrustStorePath("target/config/hono-demo-certs-jar/trusted-certs.pem");
        registryProps.setHostnameVerificationRequired(false);
        this.honoRegistryClient = new HonoClientImpl(this.vertx, registryProps);
    }

    protected void sendData() {
        CompletableFuture tokenDone = new CompletableFuture();
        this.getHonoClients().compose(ok -> this.getRegistrationAssertion()).map(receivedToken -> {
            tokenDone.complete(receivedToken);
            return null;
        }).otherwise(t -> {
            System.err.println("cannot send messages: " + t.getMessage());
            tokenDone.completeExceptionally((Throwable)t);
            return null;
        });
        try {
            String token = (String)tokenDone.get();
            for (int messagesSent = 0; messagesSent < 1000; ++messagesSent) {
                this.sendMessageToHono(messagesSent, token).get();
                if (messagesSent % 250 != 0) continue;
                System.out.println("Sent " + messagesSent + " messages.");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            e.printStackTrace();
        }
        this.closeClients();
        System.out.println("Total number of messages: 1000");
        System.out.println("Successful deliveries   : " + this.nrMessageDeliverySucceeded + (this.eventMode ? " (incl. acknowledge)." : "."));
        System.out.println("Failed deliveries       : " + this.nrMessageDeliveryFailed.get());
        this.vertx.setTimer(2000L, timerId -> this.vertx.close());
    }

    private CompletableFuture<Void> sendMessageToHono(int value, String token) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        Handler sendHandler = go -> this.messageSender.send("4711", null, "myMessage" + value, "text/plain", token).setHandler(sendAttempt -> {
            this.messageDeliveryCountDown.countDown();
            if (sendAttempt.succeeded()) {
                this.nrMessageDeliverySucceeded.incrementAndGet();
                result.complete(null);
            } else {
                System.err.println("Could not send message: " + sendAttempt.cause().getMessage());
                this.nrMessageDeliveryFailed.incrementAndGet();
                result.completeExceptionally(sendAttempt.cause());
            }
        });
        this.vertx.runOnContext(send -> {
            if (this.messageSender.getCredit() > 0) {
                sendHandler.handle(null);
            } else {
                this.messageSender.sendQueueDrainHandler(sendHandler);
            }
        });
        return result;
    }

    private Future<Void> getHonoClients() {
        Future<RegistrationClient> registrationClientTracker = this.getRegistrationClient();
        Future<MessageSender> messageSenderTracker = this.getMessageSender();
        Future result = Future.future();
        CompositeFuture.all(registrationClientTracker, messageSenderTracker).setHandler(compositeResult -> {
            if (compositeResult.failed()) {
                System.err.println("hono clients could not be created : " + compositeResult.cause().getMessage());
                result.fail(compositeResult.cause());
            } else {
                this.registrationClient = (RegistrationClient)registrationClientTracker.result();
                this.messageSender = (MessageSender)messageSenderTracker.result();
                result.complete();
            }
        });
        return result;
    }

    private Future<RegistrationClient> getRegistrationClient() {
        return this.honoRegistryClient.connect(new ProtonClientOptions()).compose(connectedClient -> connectedClient.getOrCreateRegistrationClient("DEFAULT_TENANT"));
    }

    private Future<MessageSender> getMessageSender() {
        return this.honoMessagingClient.connect(new ProtonClientOptions()).compose(connectedClient -> {
            if (this.isEventMode()) {
                return connectedClient.getOrCreateEventSender("DEFAULT_TENANT");
            }
            return connectedClient.getOrCreateTelemetrySender("DEFAULT_TENANT");
        });
    }

    private Future<String> getRegistrationAssertion() {
        return this.registrationClient.assertRegistration("4711").map(regInfo -> regInfo.getString("assertion"));
    }

    private Future<Void> closeClients() {
        Future messagingClient = Future.future();
        Future regClient = Future.future();
        this.honoMessagingClient.shutdown(messagingClient.completer());
        this.honoRegistryClient.shutdown(regClient.completer());
        return CompositeFuture.all((Future)messagingClient, (Future)regClient).compose(ok -> Future.succeededFuture());
    }

    public boolean isEventMode() {
        return this.eventMode;
    }

    public void setEventMode(boolean value) {
        this.eventMode = value;
    }
}

