package com.netifi.broker.tracing;

import com.google.protobuf.InvalidProtocolBufferException;
import com.netifi.broker.BrokerClient;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOption;
import java.time.Duration;
import java.util.Optional;
import java.util.StringJoiner;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import zipkin2.proto3.Span;

/* loaded from: input_file:com/netifi/broker/tracing/BrokerZipkinHttpBridge.class */
public class BrokerZipkinHttpBridge implements BrokerTracingService {
    private static final Logger logger = LoggerFactory.getLogger(BrokerZipkinHttpBridge.class);
    private static final String DEFAULT_ZIPKIN_SPANS_URL = "/api/v2/spans";
    private static final String DEFAULT_ZIPKIN_TRACES_URL = "/api/v2/traces";
    private final String host;
    private final int port;
    private final String zipkinSpansUrl;
    private HttpClient httpClient;

    public BrokerZipkinHttpBridge(String str, int i, String str2) {
        this.host = str;
        this.port = i;
        this.zipkinSpansUrl = str2;
    }

    public BrokerZipkinHttpBridge(String str, int i) {
        this(str, i, DEFAULT_ZIPKIN_SPANS_URL);
    }

    public static void main(String... strArr) {
        logger.info("Starting Stand-alone Broker Zipkin HTTP Bridge");
        String property = System.getProperty("netifi.tracingGroup", "com.netifi.broker.tracing");
        String property2 = System.getProperty("netifi.host", "localhost");
        int intValue = Integer.getInteger("netifi.port", 8001).intValue();
        String property3 = System.getProperty("netifi.zipkinHost", "localhost");
        int intValue2 = Integer.getInteger("netifi.zipkinPort", 9411).intValue();
        String property4 = System.getProperty("netifi.zipkinSpansUrl", DEFAULT_ZIPKIN_SPANS_URL);
        long longValue = Long.getLong("netifi.accessKey", 3855261330795754807L).longValue();
        String property5 = System.getProperty("netifi.authentication.accessToken", "kTBDVtfRBO4tHOnZzSyY5ym2kfY");
        logger.info("group - {}", property);
        logger.info("broker host - {}", property2);
        logger.info("broker port - {}", Integer.valueOf(intValue));
        logger.info("zipkin host - {}", property3);
        logger.info("zipkin port - {}", Integer.valueOf(intValue2));
        logger.info("zipkin spans url - {}", property4);
        logger.info("access key - {}", Long.valueOf(longValue));
        BrokerClient build = BrokerClient.tcp().accessKey(longValue).accessToken(property5).group(property).host(property2).port(intValue).destination("standaloneZipkinBridge").build();
        build.addService(new BrokerTracingServiceServer(new BrokerZipkinHttpBridge(property3, intValue2, property4), Optional.empty(), Optional.empty()));
        build.onClose().block();
    }

    private synchronized HttpClient getClient() {
        if (this.httpClient == null) {
            this.httpClient = HttpClient.create(ConnectionProvider.fixed("brokerZipkinBridge")).compress(true).port(this.port).tcpConfiguration(tcpClient -> {
                return tcpClient.host(this.host).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000);
            });
        }
        return this.httpClient;
    }

    private synchronized void resetHttpClient() {
        this.httpClient = null;
    }

    @Override // com.netifi.broker.tracing.BrokerTracingService
    public Mono<Ack> streamSpans(Publisher<Span> publisher, ByteBuf byteBuf) {
        return Flux.from(publisher).map(span -> {
            try {
                String print = JsonFormat.printer().print(span);
                if (logger.isTraceEnabled()) {
                    logger.trace("receiving tracing data {}", print);
                }
                return print;
            } catch (InvalidProtocolBufferException e) {
                throw Exceptions.propagate(e);
            }
        }).windowTimeout(128, Duration.ofMillis(1000L)).map(flux -> {
            return flux.reduce(new StringJoiner(","), (v0, v1) -> {
                return v0.add(v1);
            }).map(stringJoiner -> {
                return "[" + stringJoiner.toString() + "]";
            });
        }).onBackpressureBuffer(65536).flatMap(mono -> {
            return mono;
        }).concatMap(str -> {
            return getClient().headers(httpHeaders -> {
                httpHeaders.add("Content-Type", "application/json");
            }).post().uri(this.zipkinSpansUrl).send(ByteBufFlux.fromString(Mono.just(str))).response().timeout(Duration.ofSeconds(30L)).doOnError(th -> {
                resetHttpClient();
            });
        }, 8).doOnError(th -> {
            logger.error("error sending data to tracing data to url " + this.zipkinSpansUrl, th);
        }).then(Mono.never());
    }
}
