/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.command;

import io.opentracing.SpanContext;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.amqp.connection.ConnectionLifecycle;
import org.eclipse.hono.client.command.CommandConsumer;
import org.eclipse.hono.client.command.CommandConsumerFactory;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandHandlerWrapper;
import org.eclipse.hono.client.command.CommandHandlers;
import org.eclipse.hono.client.command.CommandRouterClient;
import org.eclipse.hono.client.command.InternalCommandConsumer;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandRouterCommandConsumerFactory
implements CommandConsumerFactory,
ServiceClient {
    private static final Logger LOG = LoggerFactory.getLogger(CommandRouterCommandConsumerFactory.class);
    private static final AtomicInteger ADAPTER_INSTANCE_ID_COUNTER = new AtomicInteger();
    private final String adapterInstanceId;
    private final CommandHandlers commandHandlers = new CommandHandlers();
    private final CommandRouterClient commandRouterClient;
    private final List<InternalCommandConsumer> internalCommandConsumers = new ArrayList<InternalCommandConsumer>();
    private int maxTenantIdsPerRequest = 100;

    public CommandRouterCommandConsumerFactory(CommandRouterClient commandRouterClient, String adapterName) {
        this.commandRouterClient = Objects.requireNonNull(commandRouterClient);
        Objects.requireNonNull(adapterName);
        this.adapterInstanceId = CommandConstants.getNewAdapterInstanceId((String)adapterName, (int)ADAPTER_INSTANCE_ID_COUNTER.getAndIncrement());
        if (commandRouterClient instanceof ConnectionLifecycle) {
            ((ConnectionLifecycle)commandRouterClient).addReconnectListener(con -> this.reenableCommandRouting());
        }
    }

    void setMaxTenantIdsPerRequest(int count) {
        this.maxTenantIdsPerRequest = count;
    }

    private void reenableCommandRouting() {
        List tenantIds = this.commandHandlers.getCommandHandlers().stream().map(CommandHandlerWrapper::getTenantId).distinct().collect(Collectors.toList());
        int idx = 0;
        while (idx < tenantIds.size()) {
            int from = idx;
            int to = from + Math.min(this.maxTenantIdsPerRequest, tenantIds.size() - idx);
            List<String> chunk = tenantIds.subList(from, to);
            this.commandRouterClient.enableCommandRouting(chunk, null);
            idx = to;
        }
    }

    public void registerInternalCommandConsumer(BiFunction<String, CommandHandlers, InternalCommandConsumer> internalCommandConsumerSupplier) {
        InternalCommandConsumer consumer = internalCommandConsumerSupplier.apply(this.adapterInstanceId, this.commandHandlers);
        LOG.info("register internal command consumer {}", (Object)consumer.getClass().getSimpleName());
        this.internalCommandConsumers.add(consumer);
    }

    public Future<Void> start() {
        List futures = this.internalCommandConsumers.stream().map(Lifecycle::start).collect(Collectors.toList());
        if (futures.isEmpty()) {
            return Future.failedFuture((String)"no command consumer registered");
        }
        return CompositeFuture.all(futures).mapEmpty();
    }

    public Future<Void> stop() {
        List futures = this.internalCommandConsumers.stream().map(Lifecycle::stop).collect(Collectors.toList());
        return CompositeFuture.all(futures).mapEmpty();
    }

    public void registerReadinessChecks(HealthCheckHandler readinessHandler) {
        this.internalCommandConsumers.forEach(consumer -> consumer.registerReadinessChecks(readinessHandler));
    }

    public void registerLivenessChecks(HealthCheckHandler livenessHandler) {
    }

    @Override
    public final Future<CommandConsumer> createCommandConsumer(String tenantId, String deviceId, Function<CommandContext, Future<Void>> commandHandler, Duration lifespan, SpanContext context) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(commandHandler);
        return this.doCreateCommandConsumer(tenantId, deviceId, null, commandHandler, lifespan, context);
    }

    @Override
    public final Future<CommandConsumer> createCommandConsumer(String tenantId, String deviceId, String gatewayId, Function<CommandContext, Future<Void>> commandHandler, Duration lifespan, SpanContext context) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(gatewayId);
        Objects.requireNonNull(commandHandler);
        return this.doCreateCommandConsumer(tenantId, deviceId, gatewayId, commandHandler, lifespan, context);
    }

    private Future<CommandConsumer> doCreateCommandConsumer(String tenantId, String deviceId, String gatewayId, Function<CommandContext, Future<Void>> commandHandler, Duration lifespan, SpanContext context) {
        final Duration sanitizedLifespan = lifespan == null || lifespan.isNegative() || lifespan.getSeconds() > 9223372036L ? Duration.ofSeconds(-1L) : lifespan;
        LOG.trace("create command consumer [tenant-id: {}, device-id: {}, gateway-id: {}]", new Object[]{tenantId, deviceId, gatewayId});
        SpanContext consumerCreationContextToUse = !sanitizedLifespan.isNegative() && sanitizedLifespan.toSeconds() <= 60L ? context : null;
        final CommandHandlerWrapper commandHandlerWrapper = new CommandHandlerWrapper(tenantId, deviceId, gatewayId, commandHandler, Vertx.currentContext(), consumerCreationContextToUse);
        this.commandHandlers.putCommandHandler(commandHandlerWrapper);
        final Instant lifespanStart = Instant.now();
        return this.commandRouterClient.registerCommandConsumer(tenantId, deviceId, this.adapterInstanceId, sanitizedLifespan, context).onFailure(thr -> {
            LOG.info("error registering consumer with the command router service [tenant: {}, device: {}]", new Object[]{tenantId, deviceId, thr});
            this.commandHandlers.removeCommandHandler(tenantId, deviceId);
        }).map(v -> new CommandConsumer(){

            @Override
            public Future<Void> close(SpanContext spanContext) {
                return CommandRouterCommandConsumerFactory.this.removeCommandConsumer(commandHandlerWrapper, sanitizedLifespan, lifespanStart, spanContext);
            }
        });
    }

    private Future<Void> removeCommandConsumer(CommandHandlerWrapper commandHandlerWrapper, Duration lifespan, Instant lifespanStart, SpanContext onCloseSpanContext) {
        String tenantId = commandHandlerWrapper.getTenantId();
        String deviceId = commandHandlerWrapper.getDeviceId();
        LOG.trace("remove command consumer [tenant-id: {}, device-id: {}]", (Object)tenantId, (Object)deviceId);
        if (!this.commandHandlers.removeCommandHandler(commandHandlerWrapper)) {
            LOG.debug("command consumer not removed - handler already replaced or removed [tenant: {}, device: {}]", (Object)tenantId, (Object)deviceId);
            return Future.failedFuture((Throwable)new ClientErrorException(412, "local command handler already replaced or removed"));
        }
        return this.commandRouterClient.unregisterCommandConsumer(tenantId, deviceId, this.adapterInstanceId, onCloseSpanContext).recover(thr -> {
            if (ServiceInvocationException.extractStatusCode((Throwable)thr) == 412) {
                boolean entryMayHaveExpired;
                boolean bl = entryMayHaveExpired = !lifespan.isNegative() && Instant.now().isAfter(lifespanStart.plus(lifespan));
                if (entryMayHaveExpired) {
                    LOG.trace("ignoring 412 error when unregistering consumer with the command router service; entry may have already expired [tenant: {}, device: {}]", (Object)tenantId, (Object)deviceId);
                    return Future.succeededFuture();
                }
                LOG.debug("consumer not unregistered - not matched or already removed [tenant: {}, device: {}]", (Object)tenantId, (Object)deviceId);
                return Future.failedFuture((Throwable)new ClientErrorException(412, "no matching command consumer mapping found to be removed"));
            }
            LOG.info("error unregistering consumer with the command router service [tenant: {}, device: {}]", new Object[]{tenantId, deviceId, thr});
            return Future.failedFuture((Throwable)thr);
        });
    }
}

