/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.telemetry.kafka;

import io.opentracing.Tracer;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.producer.KafkaProducer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.producer.AbstractKafkaBasedMessageSender;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.KafkaProducerHelper;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.client.util.DownstreamMessageProperties;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.NotificationType;
import org.eclipse.hono.notification.deviceregistry.LifecycleChange;
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.TenantObject;

public abstract class AbstractKafkaBasedDownstreamSender
extends AbstractKafkaBasedMessageSender {
    private final boolean isDefaultsEnabled;

    public AbstractKafkaBasedDownstreamSender(Vertx vertx, KafkaProducerFactory<String, Buffer> producerFactory, String producerName, MessagingKafkaProducerConfigProperties config, boolean includeDefaults, Tracer tracer) {
        super(producerFactory, producerName, config, tracer);
        Objects.requireNonNull(vertx);
        this.isDefaultsEnabled = includeDefaults;
        NotificationEventBusSupport.registerConsumer((Vertx)vertx, (NotificationType)TenantChangeNotification.TYPE, notification -> {
            if (LifecycleChange.DELETE.equals((Object)notification.getChange())) {
                producerFactory.getProducer(producerName).ifPresent(producer -> this.removeTenantTopicBasedProducerMetrics((KafkaProducer<String, Buffer>)producer, notification.getTenantId()));
            }
        });
    }

    private void removeTenantTopicBasedProducerMetrics(KafkaProducer<String, Buffer> producer, String tenantId) {
        HonoTopic topic = new HonoTopic(this.getTopicType(), tenantId);
        KafkaProducerHelper.removeTopicMetrics(producer, Stream.of(topic.toString()));
    }

    protected abstract HonoTopic.Type getTopicType();

    protected final Map<String, Object> addDefaults(String endpointName, TenantObject tenant, RegistrationAssertion device, QoS qos, String contentType, Buffer payload, Map<String, Object> properties) {
        Objects.requireNonNull(endpointName);
        Objects.requireNonNull(tenant);
        Objects.requireNonNull(device);
        Objects.requireNonNull(qos);
        Map messageProperties = Optional.ofNullable(properties).map(HashMap::new).orElseGet(HashMap::new);
        messageProperties.put("device_id", device.getDeviceId());
        messageProperties.put("qos", qos.ordinal());
        Map propsWithDefaults = new DownstreamMessageProperties(endpointName, this.isDefaultsEnabled ? tenant.getDefaults().getMap() : null, this.isDefaultsEnabled ? device.getDefaults() : null, messageProperties, tenant.getResourceLimits()).asMap();
        if (contentType != null) {
            propsWithDefaults.put("content-type", contentType);
        } else if (payload != null) {
            propsWithDefaults.putIfAbsent("content-type", "application/octet-stream");
        }
        return propsWithDefaults;
    }
}

