/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.consistency.checking.full;

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.neo4j.consistency.checking.full.RecordCheckWorker;
import org.neo4j.consistency.checking.full.RecordProcessor;
import org.neo4j.helpers.progress.ProgressListener;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.string.Workers;

public class RecordDistributor {
    public static <RECORD> void distributeRecords(int numberOfThreads, String workerNames, int queueSize, Iterable<RECORD> records, ProgressListener progress, RecordProcessor<RECORD> processor) {
        Iterator<RECORD> iterator = records.iterator();
        if (!iterator.hasNext()) {
            return;
        }
        processor.process(iterator.next());
        progress.add(1L);
        ArrayBlockingQueue[] recordQ = new ArrayBlockingQueue[numberOfThreads];
        Workers workers = new Workers(workerNames);
        for (int threadId = 0; threadId < numberOfThreads; ++threadId) {
            recordQ[threadId] = new ArrayBlockingQueue(queueSize);
            workers.start(new Worker<RECORD>(recordQ[threadId], processor));
        }
        int[] recsProcessed = new int[numberOfThreads];
        int qIndex = 0;
        Object last = null;
        while (iterator.hasNext()) {
            try {
                RECORD record = iterator.next();
                if (!iterator.hasNext()) {
                    last = record;
                    break;
                }
                qIndex = (qIndex + 1) % numberOfThreads;
                recordQ[qIndex].put(record);
                int n = qIndex;
                recsProcessed[n] = recsProcessed[n] + 1;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
            progress.add(1L);
        }
        for (Worker worker : workers) {
            worker.done();
        }
        try {
            workers.awaitAndThrowOnError(RuntimeException.class);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Was interrupted while awaiting completion");
        }
        if (last != null) {
            processor.process(last);
            progress.add(1L);
        }
    }

    private static class Worker<RECORD>
    extends RecordCheckWorker<RECORD> {
        private final RecordProcessor<RECORD> processor;

        Worker(BlockingQueue<RECORD> recordsQ, RecordProcessor<RECORD> processor) {
            super(recordsQ);
            this.processor = processor;
        }

        @Override
        protected void process(RECORD record) {
            this.processor.process(record);
        }
    }
}

