package com.netifi.broker.tracing;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.module.SimpleDeserializers;
import com.hubspot.jackson.datatype.protobuf.ProtobufModule;
import com.netifi.broker.tracing.Trace;
import io.netty.handler.codec.json.JsonObjectDecoder;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import zipkin2.proto3.Span;

/* loaded from: input_file:com/netifi/broker/tracing/TracesStreamer.class */
public class TracesStreamer {
    private final ObjectMapper objectMapper;
    private Function<Integer, Publisher<InputStream>> inputSource;

    /* loaded from: input_file:com/netifi/broker/tracing/TracesStreamer$CustomProtoModule.class */
    public static class CustomProtoModule extends ProtobufModule {
        public void setupModule(Module.SetupContext setupContext) {
            super.setupModule(setupContext);
            SimpleDeserializers simpleDeserializers = new SimpleDeserializers();
            simpleDeserializers.addDeserializer(Trace.class, new TracersDeserializer());
            setupContext.addDeserializers(simpleDeserializers);
        }
    }

    /* loaded from: input_file:com/netifi/broker/tracing/TracesStreamer$TracersDeserializer.class */
    public static class TracersDeserializer extends StdDeserializer<Trace> {
        public TracersDeserializer() {
            this(null);
        }

        protected TracersDeserializer(Class<?> cls) {
            super(cls);
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Trace m199deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
            Trace.Builder newBuilder = Trace.newBuilder();
            while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
                newBuilder.addSpans((Span) deserializationContext.readValue(jsonParser, Span.class));
            }
            return newBuilder.m98build();
        }
    }

    public TracesStreamer(String str, Mono<HttpClient> mono) {
        this(zipkinServerStream(str, mono));
    }

    public TracesStreamer(Publisher<InputStream> publisher) {
        this((Function<Integer, Publisher<InputStream>>) num -> {
            return publisher;
        });
    }

    private TracesStreamer(Function<Integer, Publisher<InputStream>> function) {
        this.objectMapper = protoMapper();
        this.inputSource = function;
    }

    public Flux<Trace> streamTraces(int i) {
        return streamTraces(this.inputSource.apply(Integer.valueOf(i)));
    }

    Flux<Trace> streamTraces(Publisher<InputStream> publisher) {
        return Flux.from(publisher).filter(inputStream -> {
            try {
                return inputStream.available() > 0;
            } catch (IOException e) {
                throw Exceptions.propagate(e);
            }
        }).map(inputStream2 -> {
            try {
                return (Trace) this.objectMapper.readValue(inputStream2, new TypeReference<Trace>() { // from class: com.netifi.broker.tracing.TracesStreamer.1
                });
            } catch (IOException e) {
                throw Exceptions.propagate(e);
            }
        });
    }

    private static Function<Integer, Publisher<InputStream>> zipkinServerStream(String str, Mono<HttpClient> mono) {
        return num -> {
            return mono.flatMapMany(httpClient -> {
                return httpClient.doOnRequest((httpClientRequest, connection) -> {
                    connection.addHandler(new JsonObjectDecoder(true));
                }).get().uri(zipkinQuery(str, num.intValue())).responseContent().asInputStream();
            });
        };
    }

    private static String zipkinQuery(String str, int i) {
        return str + "?lookback=" + TimeUnit.SECONDS.toMillis(i) + "&limit=100000";
    }

    private ObjectMapper protoMapper() {
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.registerModule(new CustomProtoModule());
        return objectMapper;
    }
}
