/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka.producer;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.EncodeException;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.producer.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.tracing.KafkaTracingHelper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.MessagingClient;
import org.eclipse.hono.util.MessagingType;
import org.eclipse.hono.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractKafkaBasedMessageSender
implements MessagingClient,
Lifecycle {
    private static final String DEFAULT_SPAN_NAME = "send message";
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    protected final Tracer tracer;
    private final KafkaProducerConfigProperties config;
    private final KafkaProducerFactory<String, Buffer> producerFactory;
    private final String producerName;
    private boolean stopped = false;

    public AbstractKafkaBasedMessageSender(KafkaProducerFactory<String, Buffer> producerFactory, String producerName, KafkaProducerConfigProperties config, Tracer tracer) {
        Objects.requireNonNull(producerFactory);
        Objects.requireNonNull(producerName);
        Objects.requireNonNull(config);
        Objects.requireNonNull(tracer);
        this.producerFactory = producerFactory;
        this.producerName = producerName;
        this.config = config;
        this.tracer = tracer;
    }

    public final MessagingType getMessagingType() {
        return MessagingType.kafka;
    }

    public Future<Void> start() {
        this.stopped = false;
        this.getOrCreateProducer();
        return Future.succeededFuture();
    }

    public Future<Void> stop() {
        this.stopped = true;
        return this.producerFactory.closeProducer(this.producerName);
    }

    protected void send(String topic, String tenantId, String deviceId, Buffer payload, Map<String, Object> properties, String spanOperationName, SpanContext context) {
        Objects.requireNonNull(topic);
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(properties);
        Span span = this.startSpan(spanOperationName, topic, tenantId, deviceId, "follows_from", context);
        List<KafkaHeader> headers = this.encodePropertiesAsKafkaHeaders(properties, span);
        this.sendAndWaitForOutcome(topic, tenantId, deviceId, payload, headers, span);
    }

    protected void send(String topic, String tenantId, String deviceId, Buffer payload, List<KafkaHeader> headers, String spanOperationName, SpanContext context) {
        Objects.requireNonNull(topic);
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(headers);
        Span span = this.startSpan(spanOperationName, topic, tenantId, deviceId, "follows_from", context);
        this.sendAndWaitForOutcome(topic, tenantId, deviceId, payload, headers, span);
    }

    protected Future<Void> sendAndWaitForOutcome(String topic, String tenantId, String deviceId, Buffer payload, Map<String, Object> properties, String spanOperationName, SpanContext context) {
        Objects.requireNonNull(topic);
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(properties);
        Span span = this.startSpan(spanOperationName, topic, tenantId, deviceId, "child_of", context);
        List<KafkaHeader> headers = this.encodePropertiesAsKafkaHeaders(properties, span);
        return this.sendAndWaitForOutcome(topic, tenantId, deviceId, payload, headers, span);
    }

    protected Future<Void> sendAndWaitForOutcome(String topic, String tenantId, String deviceId, Buffer payload, List<KafkaHeader> headers, String spanOperationName, SpanContext context) {
        Objects.requireNonNull(topic);
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(headers);
        Span span = this.startSpan(spanOperationName, topic, tenantId, deviceId, "child_of", context);
        return this.sendAndWaitForOutcome(topic, tenantId, deviceId, payload, headers, span);
    }

    protected Future<Void> sendAndWaitForOutcome(String topic, String tenantId, String deviceId, Buffer payload, List<KafkaHeader> headers, Span span) {
        Objects.requireNonNull(topic);
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(headers);
        Objects.requireNonNull(span);
        if (this.stopped) {
            return Future.failedFuture((Throwable)new ServerErrorException(503, "sender already stopped"));
        }
        KafkaProducerRecord record = KafkaProducerRecord.create((String)topic, (Object)deviceId, (Object)payload);
        Promise sendPromise = Promise.promise();
        this.log.trace("sending message to Kafka [topic: {}, tenantId: {}, deviceId: {}]", new Object[]{topic, tenantId, deviceId});
        record.addHeaders(headers);
        KafkaTracingHelper.injectSpanContext(this.tracer, (KafkaProducerRecord<String, Buffer>)record, span.context());
        this.logProducerRecord(span, (KafkaProducerRecord<String, Buffer>)record);
        this.getOrCreateProducer().send(record, (Handler)sendPromise);
        return sendPromise.future().recover(t -> {
            this.logError(span, topic, tenantId, deviceId, (Throwable)t);
            span.finish();
            return Future.failedFuture((Throwable)new ServerErrorException(this.getErrorCode((Throwable)t), t));
        }).map(recordMetadata -> {
            this.logRecordMetadata(span, deviceId, (RecordMetadata)recordMetadata);
            span.finish();
            return null;
        });
    }

    private KafkaProducer<String, Buffer> getOrCreateProducer() {
        return this.producerFactory.getOrCreateProducer(this.producerName, this.config);
    }

    protected final List<KafkaHeader> encodePropertiesAsKafkaHeaders(Map<String, Object> properties, Span span) {
        ArrayList<KafkaHeader> headers = new ArrayList<KafkaHeader>();
        properties.forEach((k, v) -> {
            try {
                headers.add(KafkaRecordHelper.createKafkaHeader(k, v));
            }
            catch (EncodeException e) {
                this.log.info("failed to serialize property with key [{}] to Kafka header", k);
                span.log("failed to create Kafka header from property: " + k);
            }
        });
        return headers;
    }

    protected Span startChildSpan(String operationName, String topic, String tenantId, String deviceId, SpanContext context) {
        return this.startSpan(operationName, topic, tenantId, deviceId, "child_of", context);
    }

    protected Span startSpan(String operationName, String topic, String tenantId, String deviceId, String referenceType, SpanContext context) {
        String operationNameToUse = Strings.isNullOrEmpty((Object)operationName) ? DEFAULT_SPAN_NAME : operationName;
        return KafkaTracingHelper.newProducerSpan(this.tracer, operationNameToUse, topic, referenceType, context).setTag(TracingHelper.TAG_TENANT_ID.getKey(), tenantId).setTag(TracingHelper.TAG_DEVICE_ID.getKey(), deviceId);
    }

    private void logProducerRecord(Span span, KafkaProducerRecord<String, Buffer> record) {
        String headersAsString = record.headers().stream().map(header -> header.key() + "=" + header.value()).collect(Collectors.joining(",", "{", "}"));
        this.log.trace("producing message [topic: {}, key: {}, partition: {}, timestamp: {}, headers: {}]", new Object[]{record.topic(), record.key(), record.partition(), record.timestamp(), headersAsString});
        span.log("producing message with headers: " + headersAsString);
    }

    private void logRecordMetadata(Span span, String recordKey, RecordMetadata metadata) {
        this.log.trace("message produced to Kafka [topic: {}, key: {}, partition: {}, offset: {}, timestamp: {}]", new Object[]{metadata.getTopic(), recordKey, metadata.getPartition(), metadata.getOffset(), metadata.getTimestamp()});
        span.log("message produced to Kafka");
        KafkaTracingHelper.setRecordMetadataTags(span, metadata);
        Tags.HTTP_STATUS.set(span, Integer.valueOf(202));
    }

    private void logError(Span span, String topic, String tenantId, String deviceId, Throwable cause) {
        this.log.debug("sending message failed [topic: {}, key: {}, tenantId: {}, deviceId: {}]", new Object[]{topic, deviceId, tenantId, deviceId, cause});
        Tags.HTTP_STATUS.set(span, Integer.valueOf(this.getErrorCode(cause)));
        TracingHelper.logError((Span)span, (Throwable)cause);
    }

    private int getErrorCode(Throwable t) {
        return 503;
    }
}

