/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.grpc;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.hazelcast.core.IMap;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.datamodel.Tuple3;
import com.hazelcast.jet.examples.grpc.BrokerInfoRequest;
import com.hazelcast.jet.examples.grpc.BrokerServiceGrpc;
import com.hazelcast.jet.examples.grpc.BrokerServiceImpl;
import com.hazelcast.jet.examples.grpc.EventGenerator;
import com.hazelcast.jet.examples.grpc.ProductInfoRequest;
import com.hazelcast.jet.examples.grpc.ProductServiceGrpc;
import com.hazelcast.jet.examples.grpc.ProductServiceImpl;
import com.hazelcast.jet.examples.grpc.datamodel.Broker;
import com.hazelcast.jet.examples.grpc.datamodel.Product;
import com.hazelcast.jet.examples.grpc.datamodel.Trade;
import com.hazelcast.jet.function.BiFunctionEx;
import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.Functions;
import com.hazelcast.jet.pipeline.ContextFactory;
import com.hazelcast.jet.pipeline.JournalInitialPosition;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.StreamStage;
import io.grpc.BindableService;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.ServerBuilder;
import io.grpc.stub.AbstractStub;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.checkerframework.checker.nullness.compatqual.NullableDecl;

public final class GRPCEnrichment {
    private static final String TRADES = "trades";
    private static final int PORT = 50051;
    private final JetInstance jet;

    private GRPCEnrichment(JetInstance jet) {
        this.jet = jet;
    }

    private static Pipeline enrichUsingGRPC() throws Exception {
        Map<Integer, Product> productMap = GRPCEnrichment.readLines("products.txt").collect(Collectors.toMap(Map.Entry::getKey, e -> new Product((Integer)e.getKey(), (String)e.getValue())));
        Map<Integer, Broker> brokerMap = GRPCEnrichment.readLines("brokers.txt").collect(Collectors.toMap(Map.Entry::getKey, e -> new Broker((Integer)e.getKey(), (String)e.getValue())));
        ServerBuilder.forPort((int)50051).addService((BindableService)new ProductServiceImpl(productMap)).addService((BindableService)new BrokerServiceImpl(brokerMap)).build().start();
        System.out.println("*** Server started, listening on 50051");
        Pipeline p = Pipeline.create();
        StreamStage trades = p.drawFrom(Sources.mapJournal((String)TRADES, (JournalInitialPosition)JournalInitialPosition.START_FROM_CURRENT)).withoutTimestamps().map(Functions.entryValue());
        ContextFactory productService = ContextFactory.withCreateFn((FunctionEx & Serializable)x -> ProductServiceGrpc.newFutureStub((Channel)GRPCEnrichment.getLocalChannel())).withDestroyFn((ConsumerEx & Serializable)stub -> GRPCEnrichment.shutdownClient(stub));
        ContextFactory brokerService = ContextFactory.withCreateFn((FunctionEx & Serializable)x -> BrokerServiceGrpc.newFutureStub((Channel)GRPCEnrichment.getLocalChannel())).withDestroyFn((ConsumerEx & Serializable)stub -> GRPCEnrichment.shutdownClient(stub));
        trades.mapUsingContextAsync(productService, (BiFunctionEx & Serializable)(service, trade) -> {
            ProductInfoRequest request = ProductInfoRequest.newBuilder().setId(trade.productId()).build();
            return GRPCEnrichment.toCompletableFuture(service.productInfo(request)).thenApply(productReply -> Tuple2.tuple2((Object)trade, (Object)productReply.getProductName()));
        }).mapUsingContextAsync(brokerService, (BiFunctionEx & Serializable)(stub, t) -> {
            BrokerInfoRequest request = BrokerInfoRequest.newBuilder().setId(((Trade)t.f0()).brokerId()).build();
            return GRPCEnrichment.toCompletableFuture(stub.brokerInfo(request)).thenApply(brokerReply -> Tuple3.tuple3((Object)t.f0(), (Object)t.f1(), (Object)brokerReply.getBrokerName()));
        }).drainTo(Sinks.logger());
        return p;
    }

    public static void main(String[] args) throws Exception {
        JetInstance jet = Jet.newJetInstance();
        new GRPCEnrichment(jet).go();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void go() throws Exception {
        EventGenerator eventGenerator = new EventGenerator((IMap<Object, Trade>)this.jet.getMap(TRADES));
        eventGenerator.start();
        try {
            Pipeline p = GRPCEnrichment.enrichUsingGRPC();
            Job job = this.jet.newJob(p);
            eventGenerator.generateEventsForFiveSeconds();
            job.cancel();
            try {
                job.join();
            }
            catch (CancellationException cancellationException) {
                // empty catch block
            }
        }
        finally {
            eventGenerator.shutdown();
            Jet.shutdownAll();
        }
    }

    private static void shutdownClient(AbstractStub stub) throws InterruptedException {
        ManagedChannel managedChannel = (ManagedChannel)stub.getChannel();
        managedChannel.shutdown().awaitTermination(5L, TimeUnit.SECONDS);
    }

    private static ManagedChannel getLocalChannel() {
        return ManagedChannelBuilder.forAddress((String)"localhost", (int)50051).usePlaintext().build();
    }

    private static Stream<Map.Entry<Integer, String>> readLines(String file) {
        try {
            InputStream stream = GRPCEnrichment.class.getResourceAsStream("/" + file);
            BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
            return reader.lines().map(GRPCEnrichment::splitLine);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static Map.Entry<Integer, String> splitLine(String e) {
        int commaPos = e.indexOf(44);
        return Util.entry((Object)Integer.valueOf(e.substring(0, commaPos)), (Object)e.substring(commaPos + 1));
    }

    private static <T> CompletableFuture<T> toCompletableFuture(ListenableFuture<T> lf) {
        final CompletableFuture f = new CompletableFuture();
        Futures.addCallback(lf, (FutureCallback)new FutureCallback<T>(){

            public void onSuccess(@NullableDecl T result) {
                f.complete(result);
            }

            public void onFailure(Throwable t) {
                f.completeExceptionally(t);
            }
        });
        return f;
    }
}

