package com.blacklocus.qs;

import com.blacklocus.misc.ExceptingRunnable;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/blacklocus/qs/QueueReader.class */
public class QueueReader<Q, T, R> extends ExceptingRunnable {
    private static final Logger LOG = LoggerFactory.getLogger(QueueReader.class);
    public static final long DEFAULT_SLEEP_MS = 20000;
    protected Iterable<Collection<Q>> queueItemProvider;
    protected QueueItemHandler<Q, T, R> handler;
    protected ExecutorService executor;
    protected long sleepMs;

    public QueueReader(Iterable<Collection<Q>> iterable, QueueItemHandler<Q, T, R> queueItemHandler, ExecutorService executorService) {
        this(iterable, queueItemHandler, executorService, DEFAULT_SLEEP_MS);
    }

    public QueueReader(Iterable<Collection<Q>> iterable, QueueItemHandler<Q, T, R> queueItemHandler, ExecutorService executorService, long j) {
        this.queueItemProvider = iterable;
        this.handler = queueItemHandler;
        this.executor = executorService;
        this.sleepMs = j;
    }

    @Override // com.blacklocus.misc.ExceptingRunnable
    public void go() throws Exception {
        for (Collection<Q> collection : this.queueItemProvider) {
            try {
                if (collection.size() > 0) {
                    for (final Q q : collection) {
                        this.handler.withFuture(q, this.executor.submit(new Callable<Pair<Q, R>>() { // from class: com.blacklocus.qs.QueueReader.1
                            /* JADX WARN: Multi-variable type inference failed */
                            @Override // java.util.concurrent.Callable
                            public Pair<Q, R> call() throws Exception {
                                RuntimeException runtimeException;
                                try {
                                    try {
                                        R process = QueueReader.this.handler.process(QueueReader.this.handler.convert(q));
                                        QueueReader.this.handler.onSuccess(q, process);
                                        Pair<Q, R> of = Pair.of(q, process);
                                        QueueReader.this.handler.onComplete(q);
                                        return of;
                                    } finally {
                                    }
                                } catch (Throwable th) {
                                    QueueReader.this.handler.onComplete(q);
                                    throw th;
                                }
                            }
                        }));
                    }
                } else {
                    LOG.debug("No items available... sleeping for {} ms", Long.valueOf(this.sleepMs));
                    Thread.sleep(this.sleepMs);
                }
            } catch (InterruptedException e) {
                LOG.error("Reader thread interrupted", e);
                Thread.currentThread().interrupt();
            } catch (Throwable th) {
                LOG.error("Runtime error in reader thread", th);
            }
        }
    }
}
