/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.io.Serializable;
import java.util.Optional;
import org.apache.beam.sdk.io.kafka.CustomTimestampPolicyWithLimitedDelay;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaTimestampType;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.kafka.common.TopicPartition;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

@FunctionalInterface
public interface TimestampPolicyFactory<KeyT, ValueT>
extends Serializable {
    public TimestampPolicy<KeyT, ValueT> createTimestampPolicy(TopicPartition var1, Optional<Instant> var2);

    public static <K, V> TimestampPolicyFactory<K, V> withProcessingTime() {
        return (tp, prev) -> new ProcessingTimePolicy();
    }

    public static <K, V> TimestampPolicyFactory<K, V> withLogAppendTime() {
        return (tp, previousWatermark) -> new LogAppendTimePolicy(previousWatermark);
    }

    public static <K, V> TimestampPolicyFactory<K, V> withCreateTime(Duration maxDelay) {
        SerializableFunction & Serializable timestampFunction = (SerializableFunction & Serializable)record -> {
            Preconditions.checkArgument((record.getTimestampType() == KafkaTimestampType.CREATE_TIME ? 1 : 0) != 0, (String)"Kafka record's timestamp is not 'CREATE_TIME' (topic: %s, partition %s, offset %s, timestamp type '%s')", (Object)record.getTopic(), (Object)record.getPartition(), (Object)record.getOffset(), (Object)((Object)record.getTimestampType()));
            return new Instant(record.getTimestamp());
        };
        return (tp, previousWatermark) -> new CustomTimestampPolicyWithLimitedDelay(timestampFunction, maxDelay, previousWatermark);
    }

    @Deprecated
    public static <K, V> TimestampPolicyFactory<K, V> withTimestampFn(SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
        return (tp, previousWatermark) -> new TimestampFnPolicy(timestampFn, previousWatermark);
    }

    public static class TimestampFnPolicy<K, V>
    extends TimestampPolicy<K, V> {
        final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
        Instant lastRecordTimestamp;

        TimestampFnPolicy(SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn, Optional<Instant> previousWatermark) {
            this.timestampFn = timestampFn;
            this.lastRecordTimestamp = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        @Override
        public Instant getTimestampForRecord(TimestampPolicy.PartitionContext context, KafkaRecord<K, V> record) {
            this.lastRecordTimestamp = (Instant)this.timestampFn.apply(record);
            return this.lastRecordTimestamp;
        }

        @Override
        public Instant getWatermark(TimestampPolicy.PartitionContext context) {
            return this.lastRecordTimestamp;
        }
    }

    public static class LogAppendTimePolicy<K, V>
    extends TimestampPolicy<K, V> {
        private static final Duration IDLE_WATERMARK_DELTA = Duration.standardSeconds((long)2L);
        protected Instant currentWatermark;

        public LogAppendTimePolicy(Optional<Instant> previousWatermark) {
            this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        @Override
        public Instant getTimestampForRecord(TimestampPolicy.PartitionContext context, KafkaRecord<K, V> record) {
            if (record.getTimestampType().equals((Object)KafkaTimestampType.LOG_APPEND_TIME)) {
                this.currentWatermark = new Instant(record.getTimestamp());
            } else if (this.currentWatermark.equals((Object)BoundedWindow.TIMESTAMP_MIN_VALUE)) {
                throw new IllegalStateException(String.format("LogAppendTimePolicy policy is enabled in reader, but Kafka record's timestamp type is LogAppendTime. Most likely it is not enabled on Kafka for the topic '%s'. Actual timestamp type is '%s'.", new Object[]{record.getTopic(), record.getTimestampType()}));
            }
            return this.currentWatermark;
        }

        @Override
        public Instant getWatermark(TimestampPolicy.PartitionContext context) {
            Instant idleWatermark;
            if (context.getMessageBacklog() == 0L && (idleWatermark = context.getBacklogCheckTime().minus((ReadableDuration)IDLE_WATERMARK_DELTA)).isAfter((ReadableInstant)this.currentWatermark)) {
                this.currentWatermark = idleWatermark;
            }
            return this.currentWatermark;
        }
    }

    public static class ProcessingTimePolicy<K, V>
    extends TimestampPolicy<K, V> {
        @Override
        public Instant getTimestampForRecord(TimestampPolicy.PartitionContext context, KafkaRecord<K, V> record) {
            return Instant.now();
        }

        @Override
        public Instant getWatermark(TimestampPolicy.PartitionContext context) {
            return Instant.now();
        }
    }
}

