/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.ResettableSingletonTraverser;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.impl.processor.AbstractAsyncTransformUsingServiceP;
import com.hazelcast.jet.impl.processor.ProcessorSupplierWithService;
import com.hazelcast.jet.pipeline.ServiceFactory;
import java.util.ArrayDeque;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class AsyncTransformUsingServiceOrderedP<C, S, T, IR, R>
extends AbstractAsyncTransformUsingServiceP<C, S> {
    private final BiFunctionEx<? super S, ? super T, ? extends CompletableFuture<IR>> callAsyncFn;
    private final BiFunctionEx<? super T, ? super IR, ? extends Traverser<? extends R>> mapResultFn;
    private ArrayDeque<Object> queue;
    private int queuedWmCount;
    private Traverser<?> currentTraverser = Traversers.empty();
    private final ResettableSingletonTraverser<Watermark> watermarkTraverser = new ResettableSingletonTraverser();
    @Probe(name="numInFlightOps")
    private final Counter asyncOpsCounterMetric = SwCounter.newSwCounter();

    public AsyncTransformUsingServiceOrderedP(@Nonnull ServiceFactory<C, S> serviceFactory, @Nullable C serviceContext, int maxConcurrentOps, @Nonnull BiFunctionEx<? super S, ? super T, ? extends CompletableFuture<IR>> callAsyncFn, @Nonnull BiFunctionEx<? super T, ? super IR, ? extends Traverser<? extends R>> mapResultFn) {
        super(serviceFactory, serviceContext, maxConcurrentOps, true);
        this.callAsyncFn = callAsyncFn;
        this.mapResultFn = mapResultFn;
    }

    @Override
    protected void init(@Nonnull Processor.Context context) throws Exception {
        super.init(context);
        this.queue = new ArrayDeque(this.maxConcurrentOps * 2);
    }

    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        if (this.makeRoomInQueue()) {
            return this.tryProcessInt(item);
        }
        return false;
    }

    protected boolean tryProcessInt(T item) {
        CompletableFuture<IR> future = this.callAsyncFn.apply(this.service, item);
        if (future != null) {
            this.queue.add(Tuple2.tuple2(item, future));
        }
        return true;
    }

    protected boolean makeRoomInQueue() {
        if (this.isQueueFull()) {
            this.tryFlushQueue();
            return !this.isQueueFull();
        }
        return true;
    }

    boolean isQueueFull() {
        return this.queue.size() - this.queuedWmCount == this.maxConcurrentOps;
    }

    @Override
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        Object lastItem = this.queue.peekLast();
        if (lastItem instanceof Watermark && watermark.key() == ((Watermark)lastItem).key()) {
            this.queue.removeLast();
            this.queue.add(watermark);
        } else {
            this.queue.add(watermark);
            ++this.queuedWmCount;
        }
        return true;
    }

    @Override
    public boolean tryProcess() {
        this.tryFlushQueue();
        this.asyncOpsCounterMetric.set(this.queue.size());
        return true;
    }

    @Override
    public boolean complete() {
        return this.tryFlushQueue();
    }

    @Override
    public boolean saveToSnapshot() {
        return this.tryFlushQueue();
    }

    boolean tryFlushQueue() {
        while (this.emitFromTraverser(this.currentTraverser)) {
            Object o = this.queue.peek();
            if (o == null) {
                return true;
            }
            if (o instanceof Watermark) {
                this.watermarkTraverser.accept((Watermark)o);
                this.currentTraverser = this.watermarkTraverser;
                --this.queuedWmCount;
            } else {
                Tuple2 cast = (Tuple2)o;
                Object item = cast.f0();
                CompletableFuture future = (CompletableFuture)cast.f1();
                assert (future != null);
                if (!future.isDone()) {
                    return false;
                }
                try {
                    this.currentTraverser = this.mapResultFn.apply(item, future.get());
                    if (this.currentTraverser == null) {
                        this.currentTraverser = Traversers.empty();
                    }
                }
                catch (Throwable e) {
                    throw new JetException("Async operation completed exceptionally: " + e, e);
                }
            }
            this.queue.remove();
        }
        return false;
    }

    public static <C, S, T, R> ProcessorSupplier supplier(@Nonnull ServiceFactory<C, S> serviceFactory, int maxConcurrentOps, @Nonnull BiFunctionEx<? super S, ? super T, ? extends CompletableFuture<Traverser<R>>> callAsyncFn) {
        return ProcessorSupplierWithService.supplierWithService(serviceFactory, (serviceFn, context) -> new AsyncTransformUsingServiceOrderedP(serviceFn, context, maxConcurrentOps, callAsyncFn, (i, r) -> r));
    }
}

