/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.io;

import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.io.MicrobatchSource;
import org.apache.beam.runners.spark.io.SourceRDD;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.scheduler.RateController;
import org.apache.spark.streaming.scheduler.RateController$;
import org.apache.spark.streaming.scheduler.rate.RateEstimator;
import org.apache.spark.streaming.scheduler.rate.RateEstimator$;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;

class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
extends InputDStream<Tuple2<Source<T>, CheckpointMarkT>> {
    private static final Logger LOG = LoggerFactory.getLogger(SourceDStream.class);
    private final UnboundedSource<T, CheckpointMarkT> unboundedSource;
    private final SerializablePipelineOptions options;
    private final Duration boundReadDuration;
    private final double readerCacheInterval;
    private final int numPartitions;
    private final int initialParallelism;
    private final long boundMaxRecords;
    private final RateController rateController = new SourceRateController(this.id(), RateEstimator$.MODULE$.create(this.ssc().conf(), this.ssc().graph().batchDuration()));

    SourceDStream(StreamingContext ssc, UnboundedSource<T, CheckpointMarkT> unboundedSource, SerializablePipelineOptions options, Long boundMaxRecords) {
        super(ssc, JavaSparkContext$.MODULE$.fakeClassTag());
        this.unboundedSource = unboundedSource;
        this.options = options;
        SparkPipelineOptions sparkOptions = (SparkPipelineOptions)options.get().as(SparkPipelineOptions.class);
        this.readerCacheInterval = 1.5 * (double)sparkOptions.getBatchIntervalMillis().longValue();
        this.boundReadDuration = this.boundReadDuration(sparkOptions.getReadTimePercentage(), sparkOptions.getMinReadTimeMillis());
        this.initialParallelism = this.ssc().sparkContext().defaultParallelism();
        Preconditions.checkArgument((this.initialParallelism > 0 ? 1 : 0) != 0, (Object)"Number of partitions must be greater than zero.");
        this.boundMaxRecords = boundMaxRecords;
        try {
            this.numPartitions = this.createMicrobatchSource().split(sparkOptions).size();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public Option<RDD<Tuple2<Source<T>, CheckpointMarkT>>> compute(Time validTime) {
        SourceRDD.Unbounded<T, CheckpointMarkT> rdd = new SourceRDD.Unbounded<T, CheckpointMarkT>(this.ssc().sparkContext(), this.options, this.createMicrobatchSource(), this.numPartitions);
        return Option.apply(rdd);
    }

    private MicrobatchSource<T, CheckpointMarkT> createMicrobatchSource() {
        return new MicrobatchSource<T, CheckpointMarkT>(this.unboundedSource, this.boundReadDuration, this.initialParallelism, this.computeReadMaxRecords(), -1, this.id(), this.readerCacheInterval);
    }

    private long computeReadMaxRecords() {
        if (this.boundMaxRecords > 0L) {
            LOG.info("Max records per batch has been set to {}, as configured in the PipelineOptions.", (Object)this.boundMaxRecords);
            return this.boundMaxRecords;
        }
        Option<Long> rateControlledMax = this.rateControlledMaxRecords();
        if (rateControlledMax.isDefined()) {
            LOG.info("Max records per batch has been set to {}, as advised by the rate controller.", rateControlledMax.get());
            return (Long)rateControlledMax.get();
        }
        LOG.info("Max records per batch has not been limited by neither configuration nor the rate controller, and will remain unlimited for the current batch ({}).", (Object)Long.MAX_VALUE);
        return Long.MAX_VALUE;
    }

    public void start() {
    }

    public void stop() {
    }

    public String name() {
        return "Beam UnboundedSource [" + this.id() + "]";
    }

    int getNumPartitions() {
        return this.numPartitions;
    }

    private Duration boundReadDuration(double readTimePercentage, long minReadTimeMillis) {
        Duration lowerBoundDuration;
        long batchDurationMillis = this.ssc().graph().batchDuration().milliseconds();
        Duration proportionalDuration = new Duration(Math.round((double)batchDurationMillis * readTimePercentage));
        Duration readDuration = proportionalDuration.isLongerThan((ReadableDuration)(lowerBoundDuration = new Duration(minReadTimeMillis))) ? proportionalDuration : lowerBoundDuration;
        LOG.info("Read duration set to: " + readDuration);
        return readDuration;
    }

    private Option<Long> rateControlledMaxRecords() {
        Option rateLimitPerBatch;
        long rateLimitPerSec;
        Option<RateController> rateControllerOption = this.rateController();
        if (rateControllerOption.isDefined() && (rateLimitPerSec = ((RateController)rateControllerOption.get()).getLatestRate()) > 0L) {
            long batchDurationSec = this.ssc().graph().batchDuration().milliseconds() / 1000L;
            rateLimitPerBatch = Option.apply((Object)(rateLimitPerSec * batchDurationSec));
        } else {
            rateLimitPerBatch = Option.empty();
        }
        return rateLimitPerBatch;
    }

    public Option<RateController> rateController() {
        if (RateController$.MODULE$.isBackPressureEnabled(this.ssc().conf())) {
            return Option.apply((Object)this.rateController);
        }
        return Option.empty();
    }

    private static class SourceRateController
    extends RateController {
        private SourceRateController(int id, RateEstimator rateEstimator) {
            super(id, rateEstimator);
        }

        public void publish(long rate) {
        }
    }
}

