/*
 * Decompiled with CFR 0.152.
 */
package org.rdfhdt.hdt.util.concurrent;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.rdfhdt.hdt.iterator.utils.AsyncIteratorFetcher;
import org.rdfhdt.hdt.util.concurrent.ExceptionThread;
import org.rdfhdt.hdt.util.concurrent.HeightTree;
import org.rdfhdt.hdt.util.io.CloseSuppressPath;
import org.rdfhdt.hdt.util.io.IOUtil;

public class KWayMerger<E, S extends Supplier<E>> {
    private static final AtomicInteger ID = new AtomicInteger();
    private final int k;
    private final AsyncIteratorFetcher<E> iteratorFetcher;
    private final KWayMergerImpl<E, S> impl;
    private final Worker[] workers;
    private final AtomicLong pathId = new AtomicLong();
    private final CloseSuppressPath workLocation;
    private final Lock dataLock = new ReentrantLock();
    private boolean started;
    private boolean end;
    private final HeightTree<Chunk> chunks = new HeightTree();
    private Throwable throwable;

    public KWayMerger(CloseSuppressPath workLocation, AsyncIteratorFetcher<E> syncSupplier, KWayMergerImpl<E, S> impl, int workers, int k) throws KWayMergerException {
        this.workLocation = workLocation;
        this.iteratorFetcher = syncSupplier;
        this.impl = impl;
        this.k = k;
        try {
            workLocation.mkdirs();
        }
        catch (IOException e) {
            throw new KWayMergerException("Can't create workLocation directory!", e);
        }
        this.workers = new Worker[workers];
        int id = ID.incrementAndGet();
        for (int i = 0; i < workers; ++i) {
            this.workers[i] = new Worker("KWayMerger#" + id + "Worker#" + i, this);
        }
    }

    public void start() {
        if (this.started) {
            throw new IllegalArgumentException("The KWayMerger was already started and can't be reused!");
        }
        this.started = true;
        for (Worker w : this.workers) {
            w.start();
        }
    }

    private void exception(Throwable t) {
        if (this.throwable != null) {
            this.throwable.addSuppressed(t);
        } else {
            this.throwable = t;
        }
        for (Worker w : this.workers) {
            w.interrupt();
        }
    }

    public Optional<CloseSuppressPath> waitResult() throws InterruptedException, KWayMergerException {
        if (!this.started) {
            throw new IllegalArgumentException("The KWayMerger hasn't been started!");
        }
        for (Worker w : this.workers) {
            w.join();
        }
        if (this.throwable != null) {
            if (this.throwable instanceof Error) {
                throw (Error)this.throwable;
            }
            if (this.throwable instanceof RuntimeException) {
                throw (RuntimeException)this.throwable;
            }
            if (this.throwable instanceof KWayMergerException) {
                throw (KWayMergerException)this.throwable;
            }
            throw new KWayMergerException(this.throwable);
        }
        if (this.chunks.size() > 1) {
            throw new KWayMergerException("Chunk size is above 1! " + this.chunks.size());
        }
        List<Chunk> all = this.chunks.getAll(1);
        return all.isEmpty() ? Optional.empty() : Optional.of(all.get(0).getPath());
    }

    private CloseSuppressPath getPath() {
        return this.workLocation.resolve("f-" + this.pathId.incrementAndGet());
    }

    private KWayMergerRunnable getTask() {
        this.dataLock.lock();
        try {
            if (this.end) {
                if (this.chunks.size() <= 1) {
                    KWayMergerRunnable kWayMergerRunnable = null;
                    return kWayMergerRunnable;
                }
                List<Chunk> all = this.chunks.getAll(this.k);
                MergeTask mergeTask = new MergeTask(all);
                return mergeTask;
            }
            List<Chunk> chunkList = this.chunks.getMax(this.k);
            if (chunkList != null) {
                MergeTask mergeTask = new MergeTask(chunkList);
                return mergeTask;
            }
            GetTask getTask = new GetTask();
            return getTask;
        }
        finally {
            this.dataLock.unlock();
        }
    }

    public static class KWayMergerException
    extends Exception {
        public KWayMergerException(String message) {
            super(message);
        }

        public KWayMergerException(String message, Throwable cause) {
            super(message, cause);
        }

        public KWayMergerException(Throwable cause) {
            super(cause);
        }
    }

    private static class Worker
    extends ExceptionThread {
        private final KWayMerger<?, ?> parent;

        public Worker(String name, KWayMerger<?, ?> parent) {
            super(name);
            this.parent = parent;
        }

        @Override
        public void runException() throws Exception {
            try {
                KWayMergerRunnable task;
                while (!this.isInterrupted() && (task = this.parent.getTask()) != null) {
                    task.run();
                }
            }
            catch (Throwable t) {
                this.parent.exception(t);
                throw t;
            }
        }
    }

    private static class Chunk {
        private final int height;
        private final CloseSuppressPath path;

        public Chunk(int height, CloseSuppressPath path) {
            this.height = height;
            this.path = path;
        }

        public int getHeight() {
            return this.height;
        }

        public CloseSuppressPath getPath() {
            return this.path;
        }
    }

    private class GetTask
    implements KWayMergerRunnable {
        private GetTask() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() throws KWayMergerException {
            CloseSuppressPath chunk = KWayMerger.this.getPath();
            Object flux = KWayMerger.this.impl.newStopFlux(KWayMerger.this.iteratorFetcher);
            KWayMerger.this.impl.createChunk(flux, chunk);
            KWayMerger.this.dataLock.lock();
            try {
                KWayMerger.this.end = KWayMerger.this.iteratorFetcher.isEnd();
                Chunk newChunk = new Chunk(1, chunk);
                KWayMerger.this.chunks.addElement(newChunk, newChunk.getHeight());
            }
            finally {
                KWayMerger.this.dataLock.unlock();
            }
        }
    }

    private class MergeTask
    implements KWayMergerRunnable {
        private final List<Chunk> chunks;

        public MergeTask(List<Chunk> chunks) {
            assert (!chunks.isEmpty()) : "empty chunks";
            this.chunks = chunks;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() throws KWayMergerException {
            int chunk = this.chunks.stream().mapToInt(Chunk::getHeight).max().orElseThrow() + 1;
            CloseSuppressPath mergec = KWayMerger.this.getPath();
            List<CloseSuppressPath> paths = this.chunks.stream().map(Chunk::getPath).collect(Collectors.toUnmodifiableList());
            KWayMerger.this.impl.mergeChunks(paths, mergec);
            try {
                IOUtil.closeAll(paths);
            }
            catch (IOException e) {
                throw new KWayMergerException("Can't close end merge files", e);
            }
            KWayMerger.this.dataLock.lock();
            try {
                KWayMerger.this.chunks.addElement(new Chunk(chunk, mergec), chunk);
            }
            finally {
                KWayMerger.this.dataLock.unlock();
            }
        }
    }

    public static interface KWayMergerImpl<E, S extends Supplier<E>> {
        public void createChunk(S var1, CloseSuppressPath var2) throws KWayMergerException;

        public void mergeChunks(List<CloseSuppressPath> var1, CloseSuppressPath var2) throws KWayMergerException;

        public S newStopFlux(Supplier<E> var1) throws KWayMergerException;
    }

    @FunctionalInterface
    private static interface KWayMergerRunnable {
        public void run() throws KWayMergerException;
    }
}

