/*
 * Decompiled with CFR 0.152.
 */
package org.janusgraph.diskstorage.keycolumnvalue.scan;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.EntryList;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.TemporaryBackendException;
import org.janusgraph.diskstorage.configuration.Configuration;
import org.janusgraph.diskstorage.keycolumnvalue.KCVSUtil;
import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore;
import org.janusgraph.diskstorage.keycolumnvalue.KeyIterator;
import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery;
import org.janusgraph.diskstorage.keycolumnvalue.StoreFeatures;
import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction;
import org.janusgraph.diskstorage.keycolumnvalue.scan.RowsCollector;
import org.janusgraph.diskstorage.keycolumnvalue.scan.StandardScannerExecutor;
import org.janusgraph.diskstorage.util.EntryArrayList;
import org.janusgraph.diskstorage.util.RecordIterator;
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MultiThreadsRowsCollector
extends RowsCollector {
    private static final int MAX_KEY_LENGTH = 128;
    private static final Logger log = LoggerFactory.getLogger(MultiThreadsRowsCollector.class);
    private final StoreFeatures storeFeatures;
    private final StoreTransaction storeTx;
    private final List<SliceQuery> queries;
    private final Predicate<StaticBuffer> keyFilter;
    private final Configuration graphConfiguration;
    private final DataPuller[] pullThreads;
    private final BlockingQueue<SliceResult>[] dataQueues;
    private boolean interrupted = false;

    MultiThreadsRowsCollector(KeyColumnValueStore store, StoreFeatures storeFeatures, StoreTransaction storeTx, List<SliceQuery> queries, Predicate<StaticBuffer> keyFilter, BlockingQueue<StandardScannerExecutor.Row> rowQueue, Configuration graphConfiguration) throws BackendException {
        super(store, rowQueue);
        this.storeFeatures = storeFeatures;
        this.storeTx = storeTx;
        this.queries = queries;
        this.keyFilter = keyFilter;
        this.graphConfiguration = graphConfiguration;
        this.dataQueues = new BlockingQueue[queries.size()];
        this.pullThreads = new DataPuller[queries.size()];
        this.setUp(queries);
    }

    private void setUp(List<SliceQuery> queries) throws BackendException {
        int pos = 0;
        for (SliceQuery sliceQuery : queries) {
            this.addDataPuller(sliceQuery, this.storeTx, pos);
            ++pos;
        }
    }

    @Override
    void run() throws InterruptedException, TemporaryBackendException {
        int numQueries = this.queries.size();
        SliceResult[] currentResults = new SliceResult[numQueries];
        while (!this.interrupted) {
            this.collectDataFromPullers(currentResults, numQueries);
            SliceResult conditionQuery = currentResults[0];
            if (conditionQuery == null) break;
            StaticBuffer key = conditionQuery.key;
            StandardScannerExecutor.Row e = this.buildRow(numQueries, currentResults, key);
            this.rowQueue.put(e);
        }
    }

    private void collectDataFromPullers(SliceResult[] currentResults, int numQueries) throws InterruptedException, TemporaryBackendException {
        for (int i = 0; i < numQueries; ++i) {
            if (currentResults[i] != null) continue;
            BlockingQueue<SliceResult> queue = this.dataQueues[i];
            SliceResult qr = queue.poll(10L, TimeUnit.MILLISECONDS);
            if (qr == null) {
                DataPuller dataPuller = this.pullThreads[i];
                if (dataPuller.isFinished()) continue;
                while (!dataPuller.isFinished() && qr == null) {
                    qr = queue.poll(10L, TimeUnit.MILLISECONDS);
                }
            }
            currentResults[i] = qr;
        }
    }

    private StandardScannerExecutor.Row buildRow(int numQueries, SliceResult[] currentResults, StaticBuffer key) {
        HashMap<SliceQuery, EntryList> queryResults = new HashMap<SliceQuery, EntryList>(numQueries);
        for (int i = 0; i < currentResults.length; ++i) {
            SliceQuery query = this.queries.get(i);
            EntryList entries = EntryList.EMPTY_LIST;
            if (currentResults[i] != null && currentResults[i].key.equals(key)) {
                assert (query.equals(currentResults[i].query));
                entries = currentResults[i].entries;
                currentResults[i] = null;
            }
            queryResults.put(query, entries);
        }
        return new StandardScannerExecutor.Row(key, queryResults);
    }

    @Override
    void join() throws InterruptedException {
        int i = 0;
        for (DataPuller dataPuller : this.pullThreads) {
            dataPuller.join(10L);
            if (dataPuller.isAlive()) {
                log.warn("Data pulling thread [{}] did not terminate. Forcing termination", (Object)i);
                if (this.storeFeatures.supportsInterruption()) {
                    dataPuller.interrupt();
                } else {
                    log.warn("Store does not support interruption, so data pulling thread [{}] cannot be interrupted", (Object)i);
                    dataPuller.finished = true;
                }
            }
            ++i;
        }
    }

    @Override
    void interrupt() {
        this.interrupted = true;
    }

    @Override
    void cleanup() {
        if (this.pullThreads != null) {
            for (DataPuller pullThread : this.pullThreads) {
                if (!pullThread.isAlive()) continue;
                if (this.storeFeatures.supportsInterruption()) {
                    pullThread.interrupt();
                    continue;
                }
                log.warn("Store does not support interruption, so data pulling thread cannot be interrupted");
                pullThread.finished = true;
            }
        }
    }

    private void addDataPuller(SliceQuery sq, StoreTransaction stx, int pos) throws BackendException {
        DataPuller dp;
        LinkedBlockingQueue<SliceResult> queue = new LinkedBlockingQueue<SliceResult>(this.graphConfiguration.get(GraphDatabaseConfiguration.PAGE_SIZE, new String[0]));
        this.dataQueues[pos] = queue;
        this.pullThreads[pos] = dp = new DataPuller(sq, queue, KCVSUtil.getKeys(this.store, sq, this.storeFeatures, 128, stx), this.keyFilter);
        dp.setName("data-puller-" + pos);
        dp.start();
    }

    private static class SliceResult {
        final SliceQuery query;
        final StaticBuffer key;
        final EntryList entries;

        private SliceResult(SliceQuery query, StaticBuffer key, EntryList entries) {
            this.query = query;
            this.key = key;
            this.entries = entries;
        }
    }

    private static class DataPuller
    extends Thread {
        private final BlockingQueue<SliceResult> queue;
        private final KeyIterator keyIterator;
        private final SliceQuery query;
        private final Predicate<StaticBuffer> keyFilter;
        private volatile boolean finished;

        private DataPuller(SliceQuery query, BlockingQueue<SliceResult> queue, KeyIterator keyIterator, Predicate<StaticBuffer> keyFilter) {
            this.query = query;
            this.queue = queue;
            this.keyIterator = keyIterator;
            this.keyFilter = keyFilter;
            this.finished = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                while (this.keyIterator.hasNext()) {
                    StaticBuffer key = (StaticBuffer)this.keyIterator.next();
                    RecordIterator<Entry> entries = this.keyIterator.getEntries();
                    if (!this.keyFilter.test(key)) continue;
                    EntryArrayList entryList = EntryArrayList.of(entries);
                    this.queue.put(new SliceResult(this.query, key, entryList));
                }
            }
            catch (InterruptedException e) {
                log.error("Data-pulling thread interrupted while waiting on queue or data", (Throwable)e);
            }
            catch (Throwable e) {
                log.error("Could not load data from storage", e);
            }
            finally {
                try {
                    this.keyIterator.close();
                }
                catch (IOException e) {
                    log.warn("Could not close storage iterator ", (Throwable)e);
                }
                this.finished = true;
            }
        }

        public boolean isFinished() {
            return this.finished;
        }
    }
}

