/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.util.Optional;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public class CustomTimestampPolicyWithLimitedDelay<K, V>
extends TimestampPolicy<K, V> {
    private final Duration maxDelay;
    private final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction;
    private Instant maxEventTimestamp;

    public CustomTimestampPolicyWithLimitedDelay(SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction, Duration maxDelay, Optional<Instant> previousWatermark) {
        this.maxDelay = maxDelay;
        this.timestampFunction = timestampFunction;
        this.maxEventTimestamp = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE).plus((ReadableDuration)maxDelay);
    }

    @Override
    public Instant getTimestampForRecord(TimestampPolicy.PartitionContext ctx, KafkaRecord<K, V> record) {
        Instant ts = (Instant)this.timestampFunction.apply(record);
        if (ts.isAfter((ReadableInstant)this.maxEventTimestamp)) {
            this.maxEventTimestamp = ts;
        }
        return ts;
    }

    @Override
    public Instant getWatermark(TimestampPolicy.PartitionContext ctx) {
        Instant now = Instant.now();
        return this.getWatermark(ctx, now);
    }

    @VisibleForTesting
    Instant getWatermark(TimestampPolicy.PartitionContext ctx, Instant now) {
        if (this.maxEventTimestamp.isAfter((ReadableInstant)now)) {
            return now.minus((ReadableDuration)this.maxDelay);
        }
        if (ctx.getMessageBacklog() == 0L && ctx.getBacklogCheckTime().minus((ReadableDuration)this.maxDelay).isAfter((ReadableInstant)this.maxEventTimestamp) && this.maxEventTimestamp.getMillis() > 0L) {
            return ctx.getBacklogCheckTime().minus((ReadableDuration)this.maxDelay);
        }
        return this.maxEventTimestamp.minus((ReadableDuration)this.maxDelay);
    }
}

