package com.aphyr.riemann.client;

import com.aphyr.riemann.Proto;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:com/aphyr/riemann/client/RiemannBatchClient.class */
public class RiemannBatchClient extends AbstractRiemannClient {
    public final int batchSize;
    public final AtomicInteger bufferSize;
    public final LinkedTransferQueue<Write> buffer;
    public final AbstractRiemannClient client;
    public final AtomicLong readPromiseTimeout;
    public final Promise<Boolean> blackhole;

    /* loaded from: input_file:com/aphyr/riemann/client/RiemannBatchClient$Write.class */
    public class Write {
        public final Proto.Event event;
        public final Promise<Boolean> promise;

        public Write(Proto.Event event, Promise promise) {
            this.event = event;
            this.promise = promise;
        }

        public Write(Proto.Event event) {
            this.event = event;
            this.promise = new Promise<>();
        }
    }

    public RiemannBatchClient(AbstractRiemannClient abstractRiemannClient) throws UnknownHostException, UnsupportedJVMException {
        this(10, abstractRiemannClient);
    }

    public RiemannBatchClient(int i, AbstractRiemannClient abstractRiemannClient) throws UnknownHostException, UnsupportedJVMException {
        this.bufferSize = new AtomicInteger();
        this.readPromiseTimeout = new AtomicLong(5000L);
        this.blackhole = new Promise<>();
        this.batchSize = i;
        this.client = abstractRiemannClient;
        this.buffer = new LinkedTransferQueue<>();
    }

    @Override // com.aphyr.riemann.client.AbstractRiemannClient
    public void sendEvents(List<Proto.Event> list) {
        try {
            Iterator<Proto.Event> it = list.iterator();
            while (it.hasNext()) {
                queue(new Write(it.next(), this.blackhole));
            }
        } catch (IOException e) {
        }
    }

    @Override // com.aphyr.riemann.client.AbstractRiemannClient
    public Boolean sendEventsWithAck(List<Proto.Event> list) throws IOException, ServerError, MsgTooLargeException {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<Proto.Event> it = list.iterator();
        while (it.hasNext()) {
            Write write = new Write(it.next());
            arrayList.add(write.promise);
            queue(write);
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            try {
                if (!((Boolean) ((Promise) it2.next()).deref(this.readPromiseTimeout.get(), TimeUnit.MILLISECONDS, false)).booleanValue()) {
                    throw new IOException("Timed out waiting for response promise.");
                }
            } catch (RuntimeException e) {
                if (e.getCause() instanceof ServerError) {
                    throw ((ServerError) e.getCause());
                }
                throw e;
            }
        }
        return true;
    }

    public void queue(Write write) throws IOException {
        this.buffer.put(write);
        if (this.batchSize <= this.bufferSize.addAndGet(1)) {
            flush();
        }
    }

    public int flush2() {
        int min = Math.min(this.batchSize, this.bufferSize.get());
        ArrayList arrayList = new ArrayList(min);
        this.buffer.drainTo(arrayList, min);
        this.bufferSize.addAndGet((-1) * arrayList.size());
        try {
            Proto.Msg.Builder newBuilder = Proto.Msg.newBuilder();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                newBuilder.addEvents(((Write) it.next()).event);
            }
            validate(sendRecvMessage(newBuilder.build()));
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((Write) it2.next()).promise.deliver(true);
            }
        } catch (RuntimeException e) {
            Iterator it3 = arrayList.iterator();
            while (it3.hasNext()) {
                ((Write) it3.next()).promise.deliver(e);
            }
        } catch (Throwable th) {
            RuntimeException runtimeException = new RuntimeException(th);
            Iterator it4 = arrayList.iterator();
            while (it4.hasNext()) {
                ((Write) it4.next()).promise.deliver(runtimeException);
            }
        }
        return arrayList.size();
    }

    @Override // com.aphyr.riemann.client.AbstractRiemannClient, com.aphyr.riemann.client.Transport
    public void flush() throws IOException {
        flush2();
    }

    @Override // com.aphyr.riemann.client.AbstractRiemannClient, com.aphyr.riemann.client.SynchronousTransport
    public Proto.Msg sendRecvMessage(Proto.Msg msg) throws IOException {
        return this.client.sendRecvMessage(msg);
    }

    @Override // com.aphyr.riemann.client.AbstractRiemannClient, com.aphyr.riemann.client.SynchronousTransport
    public Proto.Msg sendMaybeRecvMessage(Proto.Msg msg) throws IOException {
        return this.client.sendMaybeRecvMessage(msg);
    }

    @Override // com.aphyr.riemann.client.AbstractRiemannClient, com.aphyr.riemann.client.AsynchronousTransport
    public IPromise<Proto.Msg> aSendRecvMessage(Proto.Msg msg) {
        return this.client.aSendRecvMessage(msg);
    }

    @Override // com.aphyr.riemann.client.AbstractRiemannClient, com.aphyr.riemann.client.AsynchronousTransport
    public IPromise<Proto.Msg> aSendMaybeRecvMessage(Proto.Msg msg) {
        return this.client.aSendMaybeRecvMessage(msg);
    }

    @Override // com.aphyr.riemann.client.AbstractRiemannClient, com.aphyr.riemann.client.Transport
    public boolean isConnected() {
        return this.client.isConnected();
    }

    @Override // com.aphyr.riemann.client.AbstractRiemannClient, com.aphyr.riemann.client.Transport
    public void connect() throws IOException {
        this.client.connect();
    }

    @Override // com.aphyr.riemann.client.AbstractRiemannClient, com.aphyr.riemann.client.Transport
    public void disconnect() throws IOException {
        try {
            flush();
            this.client.disconnect();
        } catch (Throwable th) {
            this.client.disconnect();
            throw th;
        }
    }

    @Override // com.aphyr.riemann.client.AbstractRiemannClient, com.aphyr.riemann.client.Transport
    public void reconnect() throws IOException {
        this.client.reconnect();
    }
}
