package com.aphyr.riemann.client;

import com.aphyr.riemann.Proto;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:com/aphyr/riemann/client/RiemannBatchClient.class */
public class RiemannBatchClient implements IRiemannClient {
    public final int batchSize;
    public final AtomicInteger bufferSize;
    public final LinkedTransferQueue<Write> buffer;
    public final IRiemannClient client;
    public final AtomicLong readPromiseTimeout;

    /* loaded from: input_file:com/aphyr/riemann/client/RiemannBatchClient$Write.class */
    public class Write {
        public final Proto.Event event;
        public final ChainPromise<Proto.Msg> promise;

        public Write(Proto.Event event, ChainPromise chainPromise) {
            this.event = event;
            this.promise = chainPromise;
        }
    }

    public RiemannBatchClient(IRiemannClient iRiemannClient) throws UnsupportedJVMException {
        this(iRiemannClient, 10);
    }

    public RiemannBatchClient(IRiemannClient iRiemannClient, int i) throws UnsupportedJVMException {
        this.bufferSize = new AtomicInteger();
        this.readPromiseTimeout = new AtomicLong(5000L);
        this.client = iRiemannClient;
        this.batchSize = i;
        this.buffer = new LinkedTransferQueue<>();
    }

    @Override // com.aphyr.riemann.client.AsynchronousTransport
    public IPromise<Proto.Msg> sendMessage(Proto.Msg msg) {
        return this.client.sendMessage(msg);
    }

    @Override // com.aphyr.riemann.client.IRiemannClient
    public IPromise<Proto.Msg> sendEvents(List<Proto.Event> list) {
        ChainPromise chainPromise = new ChainPromise();
        Iterator<Proto.Event> it = list.iterator();
        while (it.hasNext()) {
            queue(new Write(it.next(), chainPromise));
        }
        return chainPromise;
    }

    @Override // com.aphyr.riemann.client.IRiemannClient
    public IPromise<Proto.Msg> sendEvents(Proto.Event... eventArr) {
        return sendEvents(Arrays.asList(eventArr));
    }

    @Override // com.aphyr.riemann.client.IRiemannClient
    public IPromise<Proto.Msg> sendEvent(Proto.Event event) {
        ChainPromise chainPromise = new ChainPromise();
        queue(new Write(event, chainPromise));
        return chainPromise;
    }

    @Override // com.aphyr.riemann.client.IRiemannClient
    public IPromise<Proto.Msg> sendException(String str, Throwable th) {
        return RiemannClient.sendException(this, str, th);
    }

    @Override // com.aphyr.riemann.client.IRiemannClient
    public IPromise<List<Proto.Event>> query(String str) {
        return this.client.query(str);
    }

    @Override // com.aphyr.riemann.client.IRiemannClient
    public EventDSL event() {
        return new EventDSL(this);
    }

    public void queue(Write write) {
        this.buffer.put(write);
        if (this.batchSize <= this.bufferSize.addAndGet(1)) {
            flush();
        }
    }

    public int flush2() {
        int min = Math.min(this.batchSize, this.bufferSize.get());
        ArrayList arrayList = new ArrayList(min);
        this.buffer.drainTo(arrayList, min);
        this.bufferSize.addAndGet((-1) * arrayList.size());
        Proto.Msg.Builder newBuilder = Proto.Msg.newBuilder();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            newBuilder.addEvents(((Write) it.next()).event);
        }
        IPromise<Proto.Msg> sendMessage = this.client.sendMessage(newBuilder.build());
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((Write) it2.next()).promise.attach(sendMessage);
        }
        return arrayList.size();
    }

    @Override // com.aphyr.riemann.client.Transport
    public void flush() {
        flush2();
    }

    @Override // com.aphyr.riemann.client.Transport
    public boolean isConnected() {
        return this.client.isConnected();
    }

    @Override // com.aphyr.riemann.client.Transport
    public void connect() throws IOException {
        this.client.connect();
    }

    @Override // com.aphyr.riemann.client.Transport, java.lang.AutoCloseable
    public void close() {
        try {
            flush();
        } finally {
            this.client.close();
        }
    }

    @Override // com.aphyr.riemann.client.Transport
    public void reconnect() throws IOException {
        this.client.reconnect();
    }

    @Override // com.aphyr.riemann.client.Transport
    public Transport transport() {
        return this.client;
    }
}
