package com.netifi.broker.tracing;

import com.netifi.common.tags.Tag;
import com.netifi.common.tags.Tags;
import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import zipkin2.Annotation;
import zipkin2.Component;
import zipkin2.Span;
import zipkin2.proto3.Annotation;
import zipkin2.proto3.Endpoint;
import zipkin2.proto3.Span;
import zipkin2.reporter.Reporter;

/* loaded from: input_file:com/netifi/broker/tracing/BrokerReporter.class */
class BrokerReporter extends Component implements Reporter<Span> {
    private static final Logger logger = LoggerFactory.getLogger(BrokerReporter.class);
    private final FluxProcessor<Span, Span> sink = DirectProcessor.create().serialize();
    private final Disposable disposable;
    private final String group;
    private final Tags tags;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BrokerReporter(BrokerTracingServiceClient brokerTracingServiceClient, String str, Tags tags) {
        this.group = str;
        this.tags = tags;
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicLong atomicLong = new AtomicLong(System.currentTimeMillis());
        this.disposable = Flux.defer(() -> {
            return brokerTracingServiceClient.streamSpans(this.sink.onBackpressureLatest().map(this::mapSpan));
        }).onErrorResume(th -> {
            if (System.currentTimeMillis() - atomicLong.getAndSet(System.currentTimeMillis()) > 30000) {
                atomicInteger.set(0);
            }
            int min = Math.min(30, atomicInteger.incrementAndGet());
            logger.error("error sending tracing data", th);
            return Mono.delay(Duration.ofSeconds(min)).then(Mono.error(th));
        }).retry().subscribe();
    }

    private zipkin2.proto3.Span mapSpan(Span span) {
        Span.Builder traceId = zipkin2.proto3.Span.newBuilder().setName(span.name()).setTraceId(span.traceId());
        if (span.parentId() != null) {
            traceId.setParentId(span.parentId());
        }
        traceId.setId(span.id());
        if (span.kind() != null) {
            traceId.setKind(Span.Kind.valueOf(span.kind().name()));
        }
        traceId.setTimestamp(span.timestampAsLong()).setDuration(span.durationAsLong());
        if (span.localEndpoint() != null) {
            traceId.setLocalEndpoint(mapEndpoint(span.localEndpoint()));
        }
        if (span.remoteEndpoint() != null) {
            traceId.setRemoteEndpoint(mapEndpoint(span.remoteEndpoint()));
        }
        Iterator it = span.annotations().iterator();
        while (it.hasNext()) {
            traceId.addAnnotations(mapAnnotation((Annotation) it.next()));
        }
        traceId.putAllTags(span.tags()).setDebug(span.debug() == null ? false : span.debug().booleanValue()).setShared(span.shared() == null ? false : span.shared().booleanValue()).putTags("group", this.group);
        Iterator it2 = this.tags.iterator();
        while (it2.hasNext()) {
            Tag tag = (Tag) it2.next();
            traceId.putTags(tag.getKey(), tag.getValue());
        }
        return traceId.m381build();
    }

    private Endpoint.Builder mapEndpoint(zipkin2.Endpoint endpoint) {
        Endpoint.Builder serviceName = Endpoint.newBuilder().setServiceName(this.group);
        if (endpoint.ipv4() != null) {
            serviceName.setIpv4(endpoint.ipv4());
        }
        if (endpoint.ipv6() != null) {
            serviceName.setIpv6(endpoint.ipv6());
        }
        return serviceName.setPort(endpoint.portAsInt());
    }

    private Annotation.Builder mapAnnotation(zipkin2.Annotation annotation) {
        return zipkin2.proto3.Annotation.newBuilder().setTimestamp(annotation.timestamp()).setValue(annotation.value());
    }

    public void report(zipkin2.Span span) {
        if (this.sink.isDisposed()) {
            return;
        }
        logger.trace("reporting tracing data - {}", span);
        this.sink.onNext(span);
    }

    public void close() throws IOException {
        if (this.disposable.isDisposed()) {
            return;
        }
        this.disposable.dispose();
    }
}
