package org.calrissian.accumulorecipes.commons.iterators;

import java.io.IOException;
import java.lang.Thread;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.OptionDescriber;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/calrissian/accumulorecipes/commons/iterators/ReadAheadIterator.class */
public class ReadAheadIterator implements SortedKeyValueIterator<Key, Value>, OptionDescriber {
    public static final String QUEUE_SIZE = "queue.size";
    public static final String TIMEOUT = "timeout";
    private static final QueueElement noMoreDataElement = new QueueElement();
    private static Logger log = Logger.getLogger(ReadAheadIterator.class);
    private SortedKeyValueIterator<Key, Value> source;
    private int queueSize = 5;
    private int timeout = 60;
    private ArrayBlockingQueue<QueueElement> queue = null;
    private QueueElement currentElement = new QueueElement();
    private ProducerThread thread = null;
    private Thread t = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/calrissian/accumulorecipes/commons/iterators/ReadAheadIterator$ProducerThread.class */
    public class ProducerThread extends ReentrantLock implements Runnable {
        private static final long serialVersionUID = 1;
        private Exception e = null;
        private SortedKeyValueIterator<Key, Value> sourceIter;
        private int waitTime;

        public ProducerThread(SortedKeyValueIterator<Key, Value> sortedKeyValueIterator) {
            this.sourceIter = null;
            this.waitTime = ReadAheadIterator.this.timeout;
            this.sourceIter = sortedKeyValueIterator;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = true;
            while (true) {
                if (!z && ReadAheadIterator.this.queue.size() <= 0) {
                    break;
                }
                try {
                    lock();
                    z = this.sourceIter.hasTop();
                    if (z) {
                        try {
                            try {
                                if (!ReadAheadIterator.this.queue.offer(new QueueElement(this.sourceIter.getTopKey(), this.sourceIter.getTopValue()), this.waitTime, TimeUnit.SECONDS)) {
                                    this.e = new TimeoutException("Background thread has exceeded wait time of " + this.waitTime + " seconds, aborting...");
                                    unlock();
                                    break;
                                } else {
                                    this.sourceIter.next();
                                    unlock();
                                }
                            } catch (InterruptedException e) {
                                this.e = e;
                                unlock();
                            }
                        } catch (Exception e2) {
                            this.e = e2;
                            ReadAheadIterator.log.error("Error calling next on source iterator", e2);
                            unlock();
                        }
                    } else {
                        unlock();
                    }
                } catch (Throwable th) {
                    unlock();
                    throw th;
                }
            }
            if (hasError()) {
                return;
            }
            try {
                ReadAheadIterator.this.queue.put(ReadAheadIterator.noMoreDataElement);
            } catch (InterruptedException e3) {
                this.e = e3;
                ReadAheadIterator.log.error("Error putting End of Data marker onto queue");
            }
        }

        public boolean hasError() {
            return this.e != null;
        }

        public Exception getError() {
            return this.e;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/calrissian/accumulorecipes/commons/iterators/ReadAheadIterator$QueueElement.class */
    public static class QueueElement {
        Key key;
        Value value;

        public QueueElement() {
            this.key = null;
            this.value = null;
        }

        public QueueElement(Key key, Value value) {
            this.key = null;
            this.value = null;
            this.key = new Key(key);
            this.value = new Value(value.get(), true);
        }

        public Key getKey() {
            return this.key;
        }

        public Value getValue() {
            return this.value;
        }
    }

    protected ReadAheadIterator(ReadAheadIterator readAheadIterator, IteratorEnvironment iteratorEnvironment) {
        this.source = readAheadIterator.source.deepCopy(iteratorEnvironment);
    }

    public ReadAheadIterator() {
    }

    public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment iteratorEnvironment) {
        return new ReadAheadIterator(this, iteratorEnvironment);
    }

    /* renamed from: getTopKey, reason: merged with bridge method [inline-methods] */
    public Key m25getTopKey() {
        return this.currentElement.getKey();
    }

    /* renamed from: getTopValue, reason: merged with bridge method [inline-methods] */
    public Value m24getTopValue() {
        return this.currentElement.getValue();
    }

    public boolean hasTop() {
        if (this.currentElement == noMoreDataElement) {
            return false;
        }
        return this.currentElement != null || this.queue.size() > 0 || this.source.hasTop();
    }

    public void init(SortedKeyValueIterator<Key, Value> sortedKeyValueIterator, Map<String, String> map, IteratorEnvironment iteratorEnvironment) throws IOException {
        validateOptions(map);
        this.source = sortedKeyValueIterator;
        this.queue = new ArrayBlockingQueue<>(this.queueSize);
        this.thread = new ProducerThread(this.source);
        this.t = new Thread(this.thread, "ReadAheadIterator-SourceThread");
        this.t.start();
    }

    public void next() throws IOException {
        while (this.t.getState().equals(Thread.State.NEW)) {
            try {
                Thread.sleep(1L);
            } catch (InterruptedException e) {
            }
        }
        if (this.t.getState().equals(Thread.State.TERMINATED) && this.thread.hasError()) {
            throw new IOException("Background thread has died", this.thread.getError());
        }
        try {
            if (this.thread.hasError()) {
                throw new IOException("background thread has error", this.thread.getError());
            }
            QueueElement queueElement = null;
            while (null == queueElement) {
                try {
                    queueElement = this.queue.poll(1L, TimeUnit.SECONDS);
                } catch (InterruptedException e2) {
                }
                if (null == queueElement && this.thread.hasError()) {
                    throw new IOException("background thread has error", this.thread.getError());
                }
            }
            this.currentElement = queueElement;
        } catch (IOException e3) {
            throw new IOException("Error getting element from source iterator", e3);
        }
    }

    public void seek(Range range, Collection<ByteSequence> collection, boolean z) throws IOException {
        if (!this.t.isAlive()) {
            throw new IOException("source iterator thread has died.");
        }
        if (this.thread.hasError()) {
            throw new IOException("background thread has error", this.thread.getError());
        }
        try {
            this.thread.lock();
            this.queue.clear();
            this.currentElement = null;
            this.source.seek(range, collection, z);
            this.thread.unlock();
            next();
        } catch (Throwable th) {
            this.thread.unlock();
            throw th;
        }
    }

    public OptionDescriber.IteratorOptions describeOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put(QUEUE_SIZE, "read ahead queue size");
        hashMap.put(TIMEOUT, "timeout in seconds before background thread thinks that the client has aborted");
        return new OptionDescriber.IteratorOptions(getClass().getSimpleName(), "Iterator that puts the source in another thread", hashMap, (List) null);
    }

    public boolean validateOptions(Map<String, String> map) {
        if (map.containsKey(QUEUE_SIZE)) {
            this.queueSize = Integer.parseInt(map.get(QUEUE_SIZE));
        }
        if (!map.containsKey(TIMEOUT)) {
            return true;
        }
        this.timeout = Integer.parseInt(map.get(TIMEOUT));
        return true;
    }
}
