/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.wordcount;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.Functions;
import com.hazelcast.function.PredicateEx;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Observable;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class WordCount {
    private static final String BOOK_LINES = "bookLines";
    private static final String COUNTS = "counts";
    private JetInstance jet;

    private static Pipeline buildPipeline() {
        Pattern delimiter = Pattern.compile("\\W+");
        Pipeline p = Pipeline.create();
        p.readFrom(Sources.map((String)BOOK_LINES)).flatMap((FunctionEx & Serializable)e -> Traversers.traverseArray((Object[])delimiter.split(((String)e.getValue()).toLowerCase()))).filter((PredicateEx & Serializable)word -> !word.isEmpty()).groupingKey(Functions.wholeItem()).aggregate(AggregateOperations.counting()).writeTo(Sinks.observable((String)COUNTS));
        return p;
    }

    public static void main(String[] args) throws Exception {
        new WordCount().go();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void go() throws Exception {
        try {
            this.setup();
            System.out.println("\nCounting words... ");
            long start = System.nanoTime();
            Pipeline p = WordCount.buildPipeline();
            Observable observable = this.jet.getObservable(COUNTS);
            CompletableFuture f = observable.toFuture(s -> s.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
            this.jet.newJob(p).join();
            System.out.println("done in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) + " milliseconds.");
            Map results = (Map)f.get();
            WordCount.checkResults(results);
            WordCount.printResults(results);
        }
        finally {
            Jet.shutdownAll();
        }
    }

    private void setup() {
        this.jet = Jet.bootstrappedInstance();
        System.out.println("Loading The Complete Works of William Shakespeare");
        try {
            long[] lineNum = new long[]{0L};
            HashMap bookLines = new HashMap();
            InputStream stream = this.getClass().getResourceAsStream("/books/shakespeare-complete-works.txt");
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream));){
                reader.lines().forEach(line -> {
                    lineNum[0] = lineNum[0] + 1L;
                    bookLines.put(lineNum[0], line);
                });
            }
            this.jet.getMap(BOOK_LINES).putAll(bookLines);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static Map<String, Long> checkResults(Map<String, Long> counts) {
        if (counts.get("the") != 27843L) {
            throw new AssertionError((Object)"Wrong count of 'the'");
        }
        System.out.println("Count of 'the' is valid");
        return counts;
    }

    private static void printResults(Map<String, Long> counts) {
        int limit = 100;
        StringBuilder sb = new StringBuilder(String.format(" Top %d entries are:%n", 100));
        sb.append("/-------+---------\\\n");
        sb.append("| Count | Word    |\n");
        sb.append("|-------+---------|\n");
        counts.entrySet().stream().sorted(Comparator.comparingLong(Map.Entry::getValue).reversed()).limit(100L).forEach(e -> sb.append(String.format("|%6d | %-8s|%n", e.getValue(), e.getKey())));
        sb.append("\\-------+---------/\n");
        System.out.println(sb.toString());
    }
}

