/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka.producer;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.EncodeException;
import io.vertx.core.json.Json;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.tracing.KafkaTracingHelper;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.MessagingClient;
import org.eclipse.hono.util.MessagingType;
import org.eclipse.hono.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractKafkaBasedMessageSender
implements MessagingClient,
ServiceClient,
Lifecycle {
    private static final String DEFAULT_SPAN_NAME = "send message";
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    protected final Tracer tracer;
    private final MessagingKafkaProducerConfigProperties config;
    private final KafkaProducerFactory<String, Buffer> producerFactory;
    private final String producerName;
    private boolean stopped = false;
    private boolean producerCreated = false;

    public AbstractKafkaBasedMessageSender(KafkaProducerFactory<String, Buffer> producerFactory, String producerName, MessagingKafkaProducerConfigProperties config, Tracer tracer) {
        Objects.requireNonNull(producerFactory);
        Objects.requireNonNull(producerName);
        Objects.requireNonNull(config);
        Objects.requireNonNull(tracer);
        this.producerFactory = producerFactory;
        this.producerName = producerName;
        this.config = config;
        this.tracer = tracer;
    }

    public final MessagingType getMessagingType() {
        return MessagingType.kafka;
    }

    public Future<Void> start() {
        this.stopped = false;
        return Future.succeededFuture().map(v -> this.getOrCreateProducer()).onSuccess(v -> {
            this.producerCreated = true;
        }).recover(thr -> {
            if (KafkaClientFactory.isRetriableClientCreationError(thr, this.config.getBootstrapServers())) {
                this.getOrCreateProducerWithRetries().onSuccess(v -> {
                    this.producerCreated = true;
                });
                return Future.succeededFuture();
            }
            return Future.failedFuture((Throwable)thr);
        }).mapEmpty();
    }

    public Future<Void> stop() {
        this.stopped = true;
        return this.producerFactory.closeProducer(this.producerName);
    }

    protected final Future<Void> sendAndWaitForOutcome(String topic, String tenantId, String deviceId, Buffer payload, Map<String, Object> properties, Span currentSpan) {
        Objects.requireNonNull(topic);
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(properties);
        Objects.requireNonNull(currentSpan);
        List<KafkaHeader> headers = this.encodePropertiesAsKafkaHeaders(properties, currentSpan);
        return this.sendAndWaitForOutcome(topic, tenantId, deviceId, payload, headers, currentSpan);
    }

    protected final Future<Void> sendAndWaitForOutcome(String topic, String tenantId, String deviceId, Buffer payload, List<KafkaHeader> headers, Span currentSpan) {
        Objects.requireNonNull(topic);
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(headers);
        Objects.requireNonNull(currentSpan);
        if (this.stopped) {
            return Future.failedFuture((Throwable)new ServerErrorException(503, "sender already stopped"));
        }
        KafkaProducerRecord record = KafkaProducerRecord.create((String)topic, (Object)deviceId, (Object)payload);
        this.log.trace("sending message to Kafka [topic: {}, tenantId: {}, deviceId: {}]", new Object[]{topic, tenantId, deviceId});
        record.addHeaders(headers);
        KafkaTracingHelper.injectSpanContext(this.tracer, (KafkaProducerRecord<String, Buffer>)record, currentSpan.context());
        this.logProducerRecord(currentSpan, (KafkaProducerRecord<String, Buffer>)record);
        return this.getOrCreateProducer().send(record).onSuccess(recordMetadata -> this.logRecordMetadata(currentSpan, deviceId, (RecordMetadata)recordMetadata)).otherwise(t -> {
            this.logError(currentSpan, topic, tenantId, deviceId, (Throwable)t);
            throw new ServerErrorException(tenantId, this.getErrorCode((Throwable)t), t);
        }).mapEmpty();
    }

    private KafkaProducer<String, Buffer> getOrCreateProducer() {
        return this.producerFactory.getOrCreateProducer(this.producerName, this.config);
    }

    private Future<KafkaProducer<String, Buffer>> getOrCreateProducerWithRetries() {
        return this.producerFactory.getOrCreateProducerWithRetries(this.producerName, this.config, KafkaClientFactory.UNLIMITED_RETRIES_DURATION);
    }

    public void registerReadinessChecks(HealthCheckHandler readinessHandler) {
        readinessHandler.register(String.format("%s-kafka-client-creation-%s", this.producerName, UUID.randomUUID()), status -> status.tryComplete((Object)(this.producerCreated ? Status.OK() : Status.KO())));
    }

    public void registerLivenessChecks(HealthCheckHandler livenessHandler) {
    }

    private List<KafkaHeader> encodePropertiesAsKafkaHeaders(Map<String, Object> properties, Span span) {
        ArrayList<KafkaHeader> headers = new ArrayList<KafkaHeader>();
        properties.forEach((k, v) -> {
            try {
                headers.add(KafkaRecordHelper.createKafkaHeader(k, v));
            }
            catch (EncodeException e) {
                this.log.info("failed to serialize property with key [{}] to Kafka header", k);
                span.log("failed to create Kafka header from property: " + k);
            }
        });
        if (!properties.containsKey("creation-time")) {
            headers.add(KafkaRecordHelper.createKafkaHeader("creation-time", Json.encode((Object)Instant.now().toEpochMilli())));
        }
        return headers;
    }

    protected Span startChildSpan(String operationName, String topic, String tenantId, String deviceId, SpanContext context) {
        return this.startSpan(operationName, topic, tenantId, deviceId, "child_of", context);
    }

    protected Span startSpan(String operationName, String topic, String tenantId, String deviceId, String referenceType, SpanContext context) {
        String operationNameToUse = Strings.isNullOrEmpty((Object)operationName) ? DEFAULT_SPAN_NAME : operationName;
        return KafkaTracingHelper.newProducerSpan(this.tracer, operationNameToUse, topic, referenceType, context).setTag(TracingHelper.TAG_TENANT_ID.getKey(), tenantId).setTag(TracingHelper.TAG_DEVICE_ID.getKey(), deviceId);
    }

    private void logProducerRecord(Span span, KafkaProducerRecord<String, Buffer> record) {
        String headersAsString = record.headers().stream().map(header -> header.key() + "=" + header.value()).collect(Collectors.joining(",", "{", "}"));
        this.log.trace("producing message [topic: {}, key: {}, partition: {}, timestamp: {}, headers: {}]", new Object[]{record.topic(), record.key(), record.partition(), record.timestamp(), headersAsString});
        span.log("producing message with headers: " + headersAsString);
    }

    private void logRecordMetadata(Span span, String recordKey, RecordMetadata metadata) {
        this.log.trace("message produced to Kafka [topic: {}, key: {}, partition: {}, offset: {}, timestamp: {}]", new Object[]{metadata.getTopic(), recordKey, metadata.getPartition(), metadata.getOffset(), metadata.getTimestamp()});
        span.log("message produced to Kafka");
        KafkaTracingHelper.setRecordMetadataTags(span, metadata);
        Tags.HTTP_STATUS.set(span, Integer.valueOf(202));
    }

    private void logError(Span span, String topic, String tenantId, String deviceId, Throwable cause) {
        this.log.debug("sending message failed [topic: {}, key: {}, tenantId: {}, deviceId: {}]", new Object[]{topic, deviceId, tenantId, deviceId, cause});
        Tags.HTTP_STATUS.set(span, Integer.valueOf(this.getErrorCode(cause)));
        TracingHelper.logError((Span)span, (Throwable)cause);
    }

    private int getErrorCode(Throwable t) {
        return 503;
    }
}

