/*
 * Decompiled with CFR 0.152.
 */
package org.rdfhdt.hdt.hdt.impl.diskimport;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import org.rdfhdt.hdt.hdt.impl.diskimport.CompressionResult;
import org.rdfhdt.hdt.hdt.impl.diskimport.CompressionResultEmpty;
import org.rdfhdt.hdt.hdt.impl.diskimport.CompressionResultFile;
import org.rdfhdt.hdt.hdt.impl.diskimport.CompressionResultPartial;
import org.rdfhdt.hdt.iterator.utils.AsyncIteratorFetcher;
import org.rdfhdt.hdt.iterator.utils.ExceptionIterator;
import org.rdfhdt.hdt.iterator.utils.SizeFetcher;
import org.rdfhdt.hdt.listener.MultiThreadListener;
import org.rdfhdt.hdt.listener.ProgressListener;
import org.rdfhdt.hdt.triples.IndexedNode;
import org.rdfhdt.hdt.triples.TripleString;
import org.rdfhdt.hdt.util.ParallelSortableArrayList;
import org.rdfhdt.hdt.util.concurrent.ExceptionFunction;
import org.rdfhdt.hdt.util.concurrent.ExceptionSupplier;
import org.rdfhdt.hdt.util.concurrent.ExceptionThread;
import org.rdfhdt.hdt.util.concurrent.KWayMerger;
import org.rdfhdt.hdt.util.io.CloseSuppressPath;
import org.rdfhdt.hdt.util.io.IOUtil;
import org.rdfhdt.hdt.util.io.compress.CompressNodeMergeIterator;
import org.rdfhdt.hdt.util.io.compress.CompressNodeReader;
import org.rdfhdt.hdt.util.io.compress.CompressUtil;
import org.rdfhdt.hdt.util.listener.IntermediateListener;
import org.rdfhdt.hdt.util.string.ByteString;
import org.rdfhdt.hdt.util.string.CompactString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SectionCompressor
implements KWayMerger.KWayMergerImpl<TripleString, SizeFetcher<TripleString>> {
    private static final Logger log = LoggerFactory.getLogger(SectionCompressor.class);
    private final CloseSuppressPath baseFileName;
    private final AsyncIteratorFetcher<TripleString> source;
    private final MultiThreadListener listener;
    private final AtomicLong triples = new AtomicLong();
    private final AtomicLong ntRawSize = new AtomicLong();
    private final int bufferSize;
    private final long chunkSize;
    private final int k;
    private final boolean debugSleepKwayDict;

    public SectionCompressor(CloseSuppressPath baseFileName, AsyncIteratorFetcher<TripleString> source, MultiThreadListener listener, int bufferSize, long chunkSize, int k, boolean debugSleepKwayDict) {
        this.source = source;
        this.listener = listener;
        this.baseFileName = baseFileName;
        this.bufferSize = bufferSize;
        this.chunkSize = chunkSize;
        this.k = k;
        this.debugSleepKwayDict = debugSleepKwayDict;
    }

    protected ByteString convertSubject(CharSequence seq) {
        return new CompactString(seq);
    }

    protected ByteString convertPredicate(CharSequence seq) {
        return new CompactString(seq);
    }

    protected ByteString convertObject(CharSequence seq) {
        return new CompactString(seq);
    }

    public CompressionResult compressToFile(int workers) throws IOException, InterruptedException, KWayMerger.KWayMergerException {
        KWayMerger<TripleString, SizeFetcher<TripleString>> merger = new KWayMerger<TripleString, SizeFetcher<TripleString>>(this.baseFileName, this.source, this, Math.max(1, workers - 1), this.k);
        merger.start();
        Optional<CloseSuppressPath> sections = merger.waitResult();
        if (sections.isEmpty()) {
            return new CompressionResultEmpty();
        }
        return new CompressionResultFile(this.triples.get(), this.ntRawSize.get(), new TripleFile(sections.get(), false));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompressionResult compressPartial() throws IOException, KWayMerger.KWayMergerException {
        ArrayList<TripleFile> files = new ArrayList<TripleFile>();
        this.baseFileName.closeWithDeleteRecurse();
        try {
            this.baseFileName.mkdirs();
            long fileName = 0L;
            while (!this.source.isEnd()) {
                TripleFile file = new TripleFile(this.baseFileName.resolve("chunk#" + fileName++), true);
                this.createChunk((SizeFetcher<TripleString>)this.newStopFlux((Supplier)this.source), file.root);
                files.add(file);
            }
        }
        catch (Throwable e) {
            try {
                throw e;
            }
            catch (Throwable throwable) {
                try {
                    IOUtil.closeAll(files);
                }
                finally {
                    this.baseFileName.close();
                }
                throw throwable;
            }
        }
        return new CompressionResultPartial(files, this.triples.get(), this.ntRawSize.get());
    }

    public CompressionResult compress(int workers, String mode) throws KWayMerger.KWayMergerException, IOException, InterruptedException {
        if (mode == null) {
            mode = "";
        }
        switch (mode) {
            case "": 
            case "compressionComplete": {
                return this.compressToFile(workers);
            }
            case "compressionPartial": {
                return this.compressPartial();
            }
        }
        throw new IllegalArgumentException("Unknown compression mode: " + mode);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void createChunk(SizeFetcher<TripleString> fetcher, CloseSuppressPath output) throws KWayMerger.KWayMergerException {
        TripleString next;
        this.listener.notifyProgress(0.0f, "start reading triples");
        ParallelSortableArrayList<IndexedNode> subjects = new ParallelSortableArrayList<IndexedNode>(IndexedNode[].class);
        ParallelSortableArrayList<IndexedNode> predicates = new ParallelSortableArrayList<IndexedNode>(IndexedNode[].class);
        ParallelSortableArrayList<IndexedNode> objects = new ParallelSortableArrayList<IndexedNode>(IndexedNode[].class);
        this.listener.notifyProgress(10.0f, "reading triples " + this.triples.get());
        while ((next = fetcher.get()) != null) {
            if (this.debugSleepKwayDict) {
                try {
                    Thread.sleep(25L);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            long tripleID = this.triples.incrementAndGet();
            IndexedNode subjectNode = new IndexedNode(this.convertSubject(next.getSubject()), tripleID);
            subjects.add(subjectNode);
            IndexedNode predicateNode = new IndexedNode(this.convertPredicate(next.getPredicate()), tripleID);
            predicates.add(predicateNode);
            IndexedNode objectNode = new IndexedNode(this.convertObject(next.getObject()), tripleID);
            objects.add(objectNode);
            if (tripleID % 100000L == 0L) {
                this.listener.notifyProgress(10.0f, "reading triples " + tripleID);
            }
            if (subjects.size() != 0x7FFFFFF9) continue;
            break;
        }
        this.ntRawSize.addAndGet(fetcher.getSize());
        try {
            TripleFile sections = new TripleFile(output, true);
            try {
                IntermediateListener il = new IntermediateListener((ProgressListener)this.listener);
                il.setRange(70.0f, 80.0f);
                il.setPrefix("creating subjects section " + sections.root.getFileName() + ": ");
                il.notifyProgress(0.0f, "sorting");
                try (OutputStream stream = sections.openWSubject();){
                    subjects.parallelSort(IndexedNode::compareTo);
                    CompressUtil.writeCompressedSection(subjects, stream, il);
                }
                il.setRange(80.0f, 90.0f);
                il.setPrefix("creating predicates section " + sections.root.getFileName() + ": ");
                il.notifyProgress(0.0f, "sorting");
                stream = sections.openWPredicate();
                try {
                    predicates.parallelSort(IndexedNode::compareTo);
                    CompressUtil.writeCompressedSection(predicates, stream, il);
                }
                finally {
                    if (stream != null) {
                        stream.close();
                    }
                }
                il.setRange(90.0f, 100.0f);
                il.setPrefix("creating objects section " + sections.root.getFileName() + ": ");
                il.notifyProgress(0.0f, "sorting");
                stream = sections.openWObject();
                try {
                    objects.parallelSort(IndexedNode::compareTo);
                    CompressUtil.writeCompressedSection(objects, stream, il);
                }
                finally {
                    if (stream != null) {
                        stream.close();
                    }
                }
            }
            finally {
                subjects.clear();
                predicates.clear();
                objects.clear();
                this.listener.notifyProgress(100.0f, "section completed" + sections.root.getFileName().toString());
            }
        }
        catch (IOException e) {
            throw new KWayMerger.KWayMergerException(e);
        }
    }

    @Override
    public void mergeChunks(List<CloseSuppressPath> inputs, CloseSuppressPath output) throws KWayMerger.KWayMergerException {
        try {
            TripleFile sections = new TripleFile(output, true);
            ArrayList<TripleFile> tripleFiles = new ArrayList<TripleFile>();
            for (CloseSuppressPath in : inputs) {
                tripleFiles.add(new TripleFile(in, false));
            }
            sections.compute(tripleFiles, false);
            this.listener.notifyProgress(100.0f, "sections merged " + sections.root.getFileName());
            IOUtil.closeAll(inputs);
        }
        catch (IOException | InterruptedException e) {
            throw new KWayMerger.KWayMergerException(e);
        }
    }

    @Override
    public SizeFetcher<TripleString> newStopFlux(Supplier<TripleString> flux) {
        return SizeFetcher.ofTripleString(flux, this.chunkSize);
    }

    public class TripleFile
    implements Closeable {
        private final CloseSuppressPath root;
        private final CloseSuppressPath s;
        private final CloseSuppressPath p;
        private final CloseSuppressPath o;

        private TripleFile(CloseSuppressPath root, boolean mkdir) throws IOException {
            this.root = root;
            this.s = root.resolve("subject");
            this.p = root.resolve("predicate");
            this.o = root.resolve("object");
            root.closeWithDeleteRecurse();
            if (mkdir) {
                root.mkdirs();
            }
        }

        @Override
        public void close() throws IOException {
            this.delete();
        }

        public void delete() throws IOException {
            this.root.close();
        }

        public OutputStream openWSubject() throws IOException {
            return this.s.openOutputStream(SectionCompressor.this.bufferSize, new OpenOption[0]);
        }

        public OutputStream openWPredicate() throws IOException {
            return this.p.openOutputStream(SectionCompressor.this.bufferSize, new OpenOption[0]);
        }

        public OutputStream openWObject() throws IOException {
            return this.o.openOutputStream(SectionCompressor.this.bufferSize, new OpenOption[0]);
        }

        public InputStream openRSubject() throws IOException {
            return this.s.openInputStream(SectionCompressor.this.bufferSize, new OpenOption[0]);
        }

        public InputStream openRPredicate() throws IOException {
            return this.p.openInputStream(SectionCompressor.this.bufferSize, new OpenOption[0]);
        }

        public InputStream openRObject() throws IOException {
            return this.o.openInputStream(SectionCompressor.this.bufferSize, new OpenOption[0]);
        }

        public CloseSuppressPath getSubjectPath() {
            return this.s;
        }

        public CloseSuppressPath getPredicatePath() {
            return this.p;
        }

        public CloseSuppressPath getObjectPath() {
            return this.o;
        }

        public void compute(List<TripleFile> triples, boolean async) throws IOException, InterruptedException {
            if (!async) {
                this.computeSubject(triples, false);
                this.computePredicate(triples, false);
                this.computeObject(triples, false);
            } else {
                ExceptionThread.async("SectionMerger" + this.root.getFileName(), () -> this.computeSubject(triples, true), () -> this.computePredicate(triples, true), () -> this.computeObject(triples, true)).joinAndCrashIfRequired();
            }
        }

        private void computeSubject(List<TripleFile> triples, boolean async) throws IOException {
            this.computeSection(triples, "subject", 0, 33, this::openWSubject, TripleFile::openRSubject, TripleFile::getSubjectPath, async);
        }

        private void computePredicate(List<TripleFile> triples, boolean async) throws IOException {
            this.computeSection(triples, "predicate", 33, 66, this::openWPredicate, TripleFile::openRPredicate, TripleFile::getPredicatePath, async);
        }

        private void computeObject(List<TripleFile> triples, boolean async) throws IOException {
            this.computeSection(triples, "object", 66, 100, this::openWObject, TripleFile::openRObject, TripleFile::getObjectPath, async);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void computeSection(List<TripleFile> triples, String section, int start, int end, ExceptionSupplier<OutputStream, IOException> openW, ExceptionFunction<TripleFile, InputStream, IOException> openR, Function<TripleFile, Closeable> fileDelete, boolean async) throws IOException {
            IntermediateListener il = new IntermediateListener((ProgressListener)SectionCompressor.this.listener);
            if (async) {
                SectionCompressor.this.listener.registerThread(Thread.currentThread().getName());
            } else {
                il.setRange(start, end);
            }
            il.setPrefix("merging " + section + " section " + this.root.getFileName() + ": ");
            il.notifyProgress(0.0f, "merging section");
            Closeable[] readers = new CompressNodeReader[triples.size()];
            Closeable[] fileDeletes = new Closeable[triples.size()];
            try {
                long size = 0L;
                for (int i = 0; i < triples.size(); ++i) {
                    CompressNodeReader reader = new CompressNodeReader(openR.apply(triples.get(i)));
                    size += reader.getSize();
                    readers[i] = reader;
                    fileDeletes[i] = fileDelete.apply(triples.get(i));
                }
                try (OutputStream output = openW.get();){
                    CompressUtil.writeCompressedSection(CompressNodeMergeIterator.buildOfTree((ExceptionIterator[])readers), size, output, il);
                }
            }
            finally {
                if (async) {
                    SectionCompressor.this.listener.unregisterThread(Thread.currentThread().getName());
                }
                try {
                    IOUtil.closeAll(readers);
                }
                finally {
                    IOUtil.closeAll(fileDeletes);
                }
            }
        }
    }
}

