/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

class WatermarkCallbackExecutor {
    private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>> callbacks = new ConcurrentHashMap();
    private final Executor executor;

    public static WatermarkCallbackExecutor create(Executor executor) {
        return new WatermarkCallbackExecutor(executor);
    }

    private WatermarkCallbackExecutor(Executor executor) {
        this.executor = executor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void callOnGuaranteedFiring(AppliedPTransform<?, ?, ?> step, BoundedWindow window, WindowingStrategy<?, ?> windowingStrategy, Runnable runnable) {
        WatermarkCallback callback = WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable);
        PriorityQueue callbackQueue = (PriorityQueue)this.callbacks.get(step);
        if (callbackQueue == null && this.callbacks.putIfAbsent(step, callbackQueue = new PriorityQueue(11, new CallbackOrdering())) != null) {
            callbackQueue = (PriorityQueue)this.callbacks.get(step);
        }
        PriorityQueue priorityQueue = callbackQueue;
        synchronized (priorityQueue) {
            callbackQueue.offer(callback);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void callOnWindowExpiration(AppliedPTransform<?, ?, ?> step, BoundedWindow window, WindowingStrategy<?, ?> windowingStrategy, Runnable runnable) {
        WatermarkCallback callback = WatermarkCallback.afterWindowExpiration(window, windowingStrategy, runnable);
        PriorityQueue callbackQueue = (PriorityQueue)this.callbacks.get(step);
        if (callbackQueue == null && this.callbacks.putIfAbsent(step, callbackQueue = new PriorityQueue(11, new CallbackOrdering())) != null) {
            callbackQueue = (PriorityQueue)this.callbacks.get(step);
        }
        PriorityQueue priorityQueue = callbackQueue;
        synchronized (priorityQueue) {
            callbackQueue.offer(callback);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void fireForWatermark(AppliedPTransform<?, ?, ?> step, Instant watermark) throws InterruptedException {
        PriorityQueue callbackQueue = (PriorityQueue)this.callbacks.get(step);
        if (callbackQueue == null) {
            return;
        }
        PriorityQueue priorityQueue = callbackQueue;
        synchronized (priorityQueue) {
            ArrayList<Runnable> toFire = new ArrayList<Runnable>();
            while (!callbackQueue.isEmpty() && ((WatermarkCallback)callbackQueue.peek()).shouldFire(watermark)) {
                toFire.add(((WatermarkCallback)callbackQueue.poll()).getCallback());
            }
            if (!toFire.isEmpty()) {
                CountDownLatch latch = new CountDownLatch(toFire.size());
                toFire.forEach(r -> this.executor.execute(() -> {
                    try {
                        r.run();
                    }
                    finally {
                        latch.countDown();
                    }
                }));
                latch.await();
            }
        }
    }

    private static class CallbackOrdering
    extends Ordering<WatermarkCallback>
    implements Serializable {
        private CallbackOrdering() {
        }

        @SuppressFBWarnings(value={"NP_METHOD_PARAMETER_TIGHTENS_ANNOTATION"}, justification="https://github.com/google/guava/issues/920")
        public int compare(@Nonnull WatermarkCallback left, @Nonnull WatermarkCallback right) {
            return ComparisonChain.start().compare((Comparable)left.fireAfter, (Comparable)right.fireAfter).compare((Object)left.callback, (Object)right.callback, (Comparator)Ordering.arbitrary()).result();
        }
    }

    private static class WatermarkCallback {
        private final Instant fireAfter;
        private final Runnable callback;

        public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
            Instant firingAfter = strategy.getTrigger().getWatermarkThatGuaranteesFiring(window);
            return new WatermarkCallback(firingAfter, callback);
        }

        public static <W extends BoundedWindow> WatermarkCallback afterWindowExpiration(BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
            Instant firingAfter = window.maxTimestamp().plus((ReadableDuration)strategy.getAllowedLateness()).plus(1L);
            return new WatermarkCallback(firingAfter, callback);
        }

        private WatermarkCallback(Instant fireAfter, Runnable callback) {
            this.fireAfter = fireAfter;
            this.callback = callback;
        }

        public boolean shouldFire(Instant currentWatermark) {
            return currentWatermark.isAfter((ReadableInstant)this.fireAfter) || currentWatermark.equals((Object)BoundedWindow.TIMESTAMP_MAX_VALUE);
        }

        public Runnable getCallback() {
            return this.callback;
        }
    }
}

