/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.util.TimestampHistory;
import com.hazelcast.util.Preconditions;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public abstract class WatermarkCoalescer {
    public static final long NO_NEW_WM = Long.MIN_VALUE;
    public static final Watermark IDLE_MESSAGE = new Watermark(Long.MAX_VALUE);

    private WatermarkCoalescer() {
    }

    public abstract long queueDone(int var1);

    public abstract void observeEvent(int var1);

    public abstract long observeWm(long var1, int var3, long var4);

    public abstract long checkWmHistory(long var1);

    public abstract long lastEmittedWm();

    abstract long getTime();

    public static WatermarkCoalescer create(int maxWatermarkRetainMillis, int queueCount) {
        Preconditions.checkNotNegative(queueCount, "queueCount must be >= 0, but is " + queueCount);
        switch (queueCount) {
            case 0: {
                return new ZeroInputImpl();
            }
        }
        return new StandardImpl(maxWatermarkRetainMillis, queueCount);
    }

    private static final class StandardImpl
    extends WatermarkCoalescer {
        private final TimestampHistory watermarkHistory;
        private final long[] queueWms;
        private final boolean[] isIdle;
        private AtomicLong lastEmittedWm = new AtomicLong(Long.MIN_VALUE);
        private long topObservedWm = Long.MIN_VALUE;
        private boolean allInputsAreIdle;
        private boolean idleMessagePending;

        StandardImpl(int maxWatermarkRetainMillis, int queueCount) {
            this.isIdle = new boolean[queueCount];
            this.queueWms = new long[queueCount];
            Arrays.fill(this.queueWms, Long.MIN_VALUE);
            this.watermarkHistory = maxWatermarkRetainMillis >= 0 && queueCount > 1 ? new TimestampHistory(TimeUnit.MILLISECONDS.toNanos(maxWatermarkRetainMillis)) : null;
        }

        @Override
        public long queueDone(int queueIndex) {
            assert (this.queueWms[queueIndex] < Long.MAX_VALUE) : "Duplicate DONE call";
            this.queueWms[queueIndex] = Long.MAX_VALUE;
            return this.checkObservedWms();
        }

        @Override
        public void observeEvent(int queueIndex) {
            if (this.isIdle[queueIndex]) {
                this.isIdle[queueIndex] = false;
                this.allInputsAreIdle = false;
            }
        }

        @Override
        public long observeWm(long systemTime, int queueIndex, long wmValue) {
            if (this.queueWms[queueIndex] >= wmValue) {
                throw new JetException("Watermarks not monotonically increasing on queue: last one=" + this.queueWms[queueIndex] + ", new one=" + wmValue);
            }
            if (wmValue == IDLE_MESSAGE.timestamp()) {
                this.isIdle[queueIndex] = true;
                return this.checkObservedWms();
            }
            this.isIdle[queueIndex] = false;
            this.allInputsAreIdle = false;
            this.queueWms[queueIndex] = wmValue;
            if (wmValue > this.topObservedWm) {
                this.topObservedWm = wmValue;
                if (this.watermarkHistory != null) {
                    this.watermarkHistory.sample(systemTime, this.topObservedWm);
                }
            }
            return this.checkObservedWms();
        }

        private long checkObservedWms() {
            if (this.allInputsAreIdle) {
                return Long.MIN_VALUE;
            }
            long min = Long.MAX_VALUE;
            int notDoneInputCount = 0;
            for (int i = 0; i < this.queueWms.length; ++i) {
                if (this.queueWms[i] < Long.MAX_VALUE) {
                    ++notDoneInputCount;
                }
                if (this.isIdle[i] || this.queueWms[i] >= min) continue;
                min = this.queueWms[i];
            }
            if (min == Long.MAX_VALUE) {
                this.allInputsAreIdle = true;
                if (this.topObservedWm > this.lastEmittedWm.get()) {
                    this.idleMessagePending = notDoneInputCount != 0;
                    this.lastEmittedWm.lazySet(this.topObservedWm);
                    return this.topObservedWm;
                }
                return notDoneInputCount != 0 ? IDLE_MESSAGE.timestamp() : Long.MIN_VALUE;
            }
            if (min > this.lastEmittedWm.get()) {
                this.lastEmittedWm.lazySet(min);
                return min;
            }
            return Long.MIN_VALUE;
        }

        @Override
        public long checkWmHistory(long systemTime) {
            if (this.idleMessagePending) {
                this.idleMessagePending = false;
                return IDLE_MESSAGE.timestamp();
            }
            if (this.watermarkHistory == null) {
                return Long.MIN_VALUE;
            }
            long historicWm = this.watermarkHistory.sample(systemTime, this.topObservedWm);
            if (historicWm > this.lastEmittedWm.get()) {
                this.lastEmittedWm.lazySet(historicWm);
                return historicWm;
            }
            return Long.MIN_VALUE;
        }

        @Override
        public long lastEmittedWm() {
            return this.lastEmittedWm.get();
        }

        @Override
        public long getTime() {
            return this.watermarkHistory != null ? System.nanoTime() : -1L;
        }
    }

    private static final class ZeroInputImpl
    extends WatermarkCoalescer {
        private ZeroInputImpl() {
        }

        @Override
        public void observeEvent(int queueIndex) {
            throw new UnsupportedOperationException();
        }

        @Override
        public long observeWm(long systemTime, int queueIndex, long wmValue) {
            throw new UnsupportedOperationException();
        }

        @Override
        public long queueDone(int queueIndex) {
            throw new UnsupportedOperationException();
        }

        @Override
        public long checkWmHistory(long systemTime) {
            return Long.MIN_VALUE;
        }

        @Override
        public long lastEmittedWm() {
            return Long.MIN_VALUE;
        }

        @Override
        public long getTime() {
            return -1L;
        }
    }
}

