/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.core;

import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.core.AppendableTraverser;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.SlidingWindowPolicy;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.WatermarkPolicy;
import com.hazelcast.jet.core.function.ObjLongBiFunction;
import com.hazelcast.jet.impl.execution.WatermarkCoalescer;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class EventTimeMapper<T> {
    public static final long NO_NATIVE_TIME = Long.MIN_VALUE;
    private static final WatermarkPolicy[] EMPTY_WATERMARK_POLICIES = new WatermarkPolicy[0];
    private static final long[] EMPTY_LONGS = new long[0];
    private final long idleTimeoutNanos;
    @Nullable
    private final ToLongFunction<? super T> timestampFn;
    private final Supplier<? extends WatermarkPolicy> newWmPolicyFn;
    private final ObjLongBiFunction<? super T, ?> wrapFn;
    @Nullable
    private final SlidingWindowPolicy watermarkThrottlingFrame;
    private final AppendableTraverser<Object> traverser = new AppendableTraverser(2);
    private WatermarkPolicy[] wmPolicies = EMPTY_WATERMARK_POLICIES;
    private long[] watermarks = EMPTY_LONGS;
    private long[] markIdleAt = EMPTY_LONGS;
    private long lastEmittedWm = Long.MIN_VALUE;
    private long topObservedWm = Long.MIN_VALUE;
    private boolean allAreIdle;

    public EventTimeMapper(EventTimePolicy<? super T> eventTimePolicy) {
        this.idleTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(eventTimePolicy.idleTimeoutMillis());
        this.timestampFn = eventTimePolicy.timestampFn();
        this.wrapFn = eventTimePolicy.wrapFn();
        this.newWmPolicyFn = eventTimePolicy.newWmPolicyFn();
        this.watermarkThrottlingFrame = eventTimePolicy.watermarkThrottlingFrameSize() != 0L ? SlidingWindowPolicy.tumblingWinPolicy(eventTimePolicy.watermarkThrottlingFrameSize()).withOffset(eventTimePolicy.watermarkThrottlingFrameOffset()) : null;
    }

    @Nonnull
    public Traverser<Object> flatMapEvent(T event, int partitionIndex, long nativeEventTime) {
        return this.flatMapEvent(System.nanoTime(), event, partitionIndex, nativeEventTime);
    }

    @Nonnull
    public Traverser<Object> flatMapIdle() {
        return this.flatMapEvent(System.nanoTime(), null, -1, Long.MIN_VALUE);
    }

    Traverser<Object> flatMapEvent(long now, @Nullable T event, int partitionIndex, long nativeEventTime) {
        long eventTime;
        assert (this.traverser.isEmpty()) : "the traverser returned previously not yet drained: remove all items from the traverser before you call this method again.";
        if (event == null) {
            this.handleNoEventInternal(now);
            return this.traverser;
        }
        if (this.timestampFn != null) {
            eventTime = this.timestampFn.applyAsLong(event);
        } else {
            eventTime = nativeEventTime;
            if (eventTime == Long.MIN_VALUE) {
                throw new JetException("Neither timestampFn nor nativeEventTime specified");
            }
        }
        this.handleEventInternal(now, partitionIndex, eventTime);
        return this.traverser.append(this.wrapFn.apply(event, eventTime));
    }

    private void handleEventInternal(long now, int partitionIndex, long eventTime) {
        this.wmPolicies[partitionIndex].reportEvent(eventTime);
        this.markIdleAt[partitionIndex] = now + this.idleTimeoutNanos;
        this.allAreIdle = false;
        this.handleNoEventInternal(now);
    }

    private void handleNoEventInternal(long now) {
        long min = Long.MAX_VALUE;
        for (int i = 0; i < this.watermarks.length; ++i) {
            if (this.idleTimeoutNanos > 0L && this.markIdleAt[i] <= now) continue;
            this.watermarks[i] = Math.max(this.watermarks[i], this.wmPolicies[i].getCurrentWatermark());
            this.topObservedWm = Math.max(this.topObservedWm, this.watermarks[i]);
            min = Math.min(min, this.watermarks[i]);
        }
        if (min == Long.MAX_VALUE) {
            if (this.allAreIdle) {
                return;
            }
            min = this.topObservedWm;
            this.allAreIdle = true;
        } else {
            this.allAreIdle = false;
        }
        if (min > this.lastEmittedWm) {
            long newWm;
            long l = newWm = this.watermarkThrottlingFrame != null ? this.watermarkThrottlingFrame.floorFrameTs(min) : Long.MIN_VALUE;
            if (newWm > this.lastEmittedWm) {
                this.traverser.append((Object)new Watermark(newWm));
                this.lastEmittedWm = newWm;
            }
        }
        if (this.allAreIdle) {
            this.traverser.append((Object)WatermarkCoalescer.IDLE_MESSAGE);
        }
    }

    public void increasePartitionCount(int newPartitionCount) {
        this.increasePartitionCount(System.nanoTime(), newPartitionCount);
    }

    void increasePartitionCount(long now, int newPartitionCount) {
        int oldPartitionCount = this.wmPolicies.length;
        if (newPartitionCount < oldPartitionCount) {
            throw new IllegalArgumentException("partition count must increase. Old count=" + oldPartitionCount + ", new count=" + newPartitionCount);
        }
        this.wmPolicies = Arrays.copyOf(this.wmPolicies, newPartitionCount);
        this.watermarks = Arrays.copyOf(this.watermarks, newPartitionCount);
        this.markIdleAt = Arrays.copyOf(this.markIdleAt, newPartitionCount);
        for (int i = oldPartitionCount; i < newPartitionCount; ++i) {
            this.wmPolicies[i] = this.newWmPolicyFn.get();
            this.watermarks[i] = Long.MIN_VALUE;
            this.markIdleAt[i] = now + this.idleTimeoutNanos;
        }
    }

    public long getWatermark(int partitionIndex) {
        return this.watermarks[partitionIndex];
    }

    public void restoreWatermark(int partitionIndex, long wm) {
        this.watermarks[partitionIndex] = wm;
        this.lastEmittedWm = Long.MAX_VALUE;
        for (long watermark : this.watermarks) {
            this.lastEmittedWm = Math.min(watermark, this.lastEmittedWm);
        }
    }
}

