/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.gax.rpc.FailedPreconditionException;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadStream;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.SideEffectFree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryStorageStreamSource<@UnknownKeyFor T>
extends BoundedSource<T> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(BigQueryStorageStreamSource.class);
    private final @UnknownKeyFor @NonNull @Initialized ReadSession readSession;
    private final @UnknownKeyFor @NonNull @Initialized ReadStream readStream;
    private final @UnknownKeyFor @NonNull @Initialized String jsonTableSchema;
    private final @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized SchemaAndRecord, T> parseFn;
    private final @UnknownKeyFor @NonNull @Initialized Coder<T> outputCoder;
    private final @UnknownKeyFor @NonNull @Initialized BigQueryServices bqServices;

    public static <T> @UnknownKeyFor @NonNull @Initialized BigQueryStorageStreamSource<T> create(@UnknownKeyFor @NonNull @Initialized ReadSession readSession, @UnknownKeyFor @NonNull @Initialized ReadStream readStream, @UnknownKeyFor @NonNull @Initialized TableSchema tableSchema, @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized SchemaAndRecord, T> parseFn, @UnknownKeyFor @NonNull @Initialized Coder<T> outputCoder, @UnknownKeyFor @NonNull @Initialized BigQueryServices bqServices) {
        return new BigQueryStorageStreamSource<T>(readSession, readStream, BigQueryHelpers.toJsonString(Preconditions.checkNotNull((Object)tableSchema, (Object)"tableSchema")), parseFn, outputCoder, bqServices);
    }

    public @UnknownKeyFor @NonNull @Initialized BigQueryStorageStreamSource<T> fromExisting(@UnknownKeyFor @NonNull @Initialized ReadStream newReadStream) {
        return new BigQueryStorageStreamSource<T>(this.readSession, newReadStream, this.jsonTableSchema, this.parseFn, this.outputCoder, this.bqServices);
    }

    private BigQueryStorageStreamSource(@UnknownKeyFor @NonNull @Initialized ReadSession readSession, @UnknownKeyFor @NonNull @Initialized ReadStream readStream, @UnknownKeyFor @NonNull @Initialized String jsonTableSchema, @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized SchemaAndRecord, T> parseFn, @UnknownKeyFor @NonNull @Initialized Coder<T> outputCoder, @UnknownKeyFor @NonNull @Initialized BigQueryServices bqServices) {
        this.readSession = (ReadSession)Preconditions.checkNotNull((Object)readSession, (Object)"readSession");
        this.readStream = (ReadStream)Preconditions.checkNotNull((Object)readStream, (Object)"stream");
        this.jsonTableSchema = (String)Preconditions.checkNotNull((Object)jsonTableSchema, (Object)"jsonTableSchema");
        this.parseFn = (SerializableFunction)Preconditions.checkNotNull(parseFn, (Object)"parseFn");
        this.outputCoder = (Coder)Preconditions.checkNotNull(outputCoder, (Object)"outputCoder");
        this.bqServices = (BigQueryServices)Preconditions.checkNotNull((Object)bqServices, (Object)"bqServices");
    }

    public @UnknownKeyFor @NonNull @Initialized Coder<T> getOutputCoder() {
        return this.outputCoder;
    }

    public void populateDisplayData(// Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        builder.add(DisplayData.item((String)"table", (String)this.readSession.getTable()).withLabel("Table")).add(DisplayData.item((String)"readSession", (String)this.readSession.getName()).withLabel("Read session")).add(DisplayData.item((String)"stream", (String)this.readStream.getName()).withLabel("Stream"));
    }

    public @UnknownKeyFor @NonNull @Initialized long getEstimatedSizeBytes(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        return 0L;
    }

    public @UnknownKeyFor @NonNull @Initialized List<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized BoundedSource<T>> split(@UnknownKeyFor @NonNull @Initialized long desiredBundleSizeBytes, @UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        return ImmutableList.of((Object)((Object)this));
    }

    public @UnknownKeyFor @NonNull @Initialized BigQueryStorageStreamReader<T> createReader(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) throws @UnknownKeyFor @NonNull @Initialized IOException {
        return new BigQueryStorageStreamReader(this, (BigQueryOptions)options.as(BigQueryOptions.class));
    }

    @SideEffectFree
    public @UnknownKeyFor @NonNull @Initialized String toString() {
        return this.readStream.toString();
    }

    public static class BigQueryStorageStreamReader<@UnknownKeyFor T>
    extends BoundedSource.BoundedReader<T> {
        private final @UnknownKeyFor @NonNull @Initialized DatumReader<@UnknownKeyFor @NonNull @Initialized GenericRecord> datumReader;
        private final @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized SchemaAndRecord, T> parseFn;
        private final @UnknownKeyFor @NonNull @Initialized BigQueryServices.StorageClient storageClient;
        private final @UnknownKeyFor @NonNull @Initialized TableSchema tableSchema;
        private @UnknownKeyFor @NonNull @Initialized BigQueryStorageStreamSource<T> source;
        private @UnknownKeyFor @NonNull @Initialized BigQueryServices.BigQueryServerStream<@UnknownKeyFor @NonNull @Initialized ReadRowsResponse> responseStream;
        private @UnknownKeyFor @NonNull @Initialized Iterator<@UnknownKeyFor @NonNull @Initialized ReadRowsResponse> responseIterator;
        private @UnknownKeyFor @NonNull @Initialized BinaryDecoder decoder;
        private @UnknownKeyFor @NonNull @Initialized GenericRecord record;
        private T current;
        private @UnknownKeyFor @NonNull @Initialized long currentOffset;
        private @UnknownKeyFor @NonNull @Initialized double fractionConsumed;
        private @UnknownKeyFor @NonNull @Initialized double progressAtResponseStart;
        private @UnknownKeyFor @NonNull @Initialized double progressAtResponseEnd;
        private @UnknownKeyFor @NonNull @Initialized long rowsConsumedFromCurrentResponse;
        private @UnknownKeyFor @NonNull @Initialized long totalRowsInCurrentResponse;

        private BigQueryStorageStreamReader(@UnknownKeyFor @NonNull @Initialized BigQueryStorageStreamSource<T> source, @UnknownKeyFor @NonNull @Initialized BigQueryOptions options) throws @UnknownKeyFor @NonNull @Initialized IOException {
            this.source = source;
            this.datumReader = new GenericDatumReader(new Schema.Parser().parse(((BigQueryStorageStreamSource)source).readSession.getAvroSchema().getSchema()));
            this.parseFn = ((BigQueryStorageStreamSource)source).parseFn;
            this.storageClient = ((BigQueryStorageStreamSource)source).bqServices.getStorageClient(options);
            this.tableSchema = BigQueryHelpers.fromJsonString(((BigQueryStorageStreamSource)source).jsonTableSchema, TableSchema.class);
            this.fractionConsumed = 0.0;
            this.progressAtResponseStart = 0.0;
            this.progressAtResponseEnd = 0.0;
            this.rowsConsumedFromCurrentResponse = 0L;
            this.totalRowsInCurrentResponse = 0L;
        }

        public synchronized @UnknownKeyFor @NonNull @Initialized boolean start() throws @UnknownKeyFor @NonNull @Initialized IOException {
            BoundedSource source = this.getCurrentSource();
            ReadRowsRequest request = ReadRowsRequest.newBuilder().setReadStream(((BigQueryStorageStreamSource)source).readStream.getName()).setOffset(this.currentOffset).build();
            this.responseStream = this.storageClient.readRows(request);
            this.responseIterator = this.responseStream.iterator();
            LOG.info("Started BigQuery Storage API read from stream {}.", (Object)((BigQueryStorageStreamSource)source).readStream.getName());
            return this.readNextRecord();
        }

        public synchronized @UnknownKeyFor @NonNull @Initialized boolean advance() throws @UnknownKeyFor @NonNull @Initialized IOException {
            ++this.currentOffset;
            return this.readNextRecord();
        }

        private synchronized @UnknownKeyFor @NonNull @Initialized boolean readNextRecord() throws @UnknownKeyFor @NonNull @Initialized IOException {
            while (this.decoder == null || this.decoder.isEnd()) {
                if (!this.responseIterator.hasNext()) {
                    this.fractionConsumed = 1.0;
                    return false;
                }
                ReadRowsResponse response = this.responseIterator.next();
                this.progressAtResponseStart = response.getStats().getProgress().getAtResponseStart();
                this.progressAtResponseEnd = response.getStats().getProgress().getAtResponseEnd();
                this.totalRowsInCurrentResponse = response.getRowCount();
                this.rowsConsumedFromCurrentResponse = 0L;
                Preconditions.checkArgument((this.totalRowsInCurrentResponse >= 0L ? 1 : 0) != 0, (String)"Row count from current response (%s) must be non-negative.", (long)this.totalRowsInCurrentResponse);
                Preconditions.checkArgument((0.0 <= this.progressAtResponseStart && this.progressAtResponseStart <= 1.0 ? 1 : 0) != 0, (String)"Progress at response start (%s) is not in the range [0.0, 1.0].", (Object)this.progressAtResponseStart);
                Preconditions.checkArgument((0.0 <= this.progressAtResponseEnd && this.progressAtResponseEnd <= 1.0 ? 1 : 0) != 0, (String)"Progress at response end (%s) is not in the range [0.0, 1.0].", (Object)this.progressAtResponseEnd);
                this.decoder = DecoderFactory.get().binaryDecoder(response.getAvroRows().getSerializedBinaryRows().toByteArray(), this.decoder);
            }
            this.record = (GenericRecord)this.datumReader.read((Object)this.record, (Decoder)this.decoder);
            this.current = this.parseFn.apply((Object)new SchemaAndRecord(this.record, this.tableSchema));
            ++this.rowsConsumedFromCurrentResponse;
            this.fractionConsumed = this.progressAtResponseStart + (this.progressAtResponseEnd - this.progressAtResponseStart) * (double)this.rowsConsumedFromCurrentResponse * 1.0 / (double)this.totalRowsInCurrentResponse;
            return true;
        }

        public T getCurrent() throws @UnknownKeyFor @NonNull @Initialized NoSuchElementException {
            return this.current;
        }

        public synchronized void close() {
            this.storageClient.close();
        }

        public synchronized @UnknownKeyFor @NonNull @Initialized BigQueryStorageStreamSource<T> getCurrentSource() {
            return this.source;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public @UnknownKeyFor @NonNull @Initialized BoundedSource<T> splitAtFraction(@UnknownKeyFor @NonNull @Initialized double fraction) {
            Metrics.counter(BigQueryStorageStreamReader.class, (String)"split-at-fraction-calls").inc();
            LOG.debug("Received BigQuery Storage API split request for stream {} at fraction {}.", (Object)((BigQueryStorageStreamSource)this.source).readStream.getName(), (Object)fraction);
            if (fraction <= 0.0 || fraction >= 1.0) {
                LOG.info("BigQuery Storage API does not support splitting at fraction {}", (Object)fraction);
                return null;
            }
            SplitReadStreamRequest splitRequest = SplitReadStreamRequest.newBuilder().setName(((BigQueryStorageStreamSource)this.source).readStream.getName()).setFraction((double)((float)fraction)).build();
            SplitReadStreamResponse splitResponse = this.storageClient.splitReadStream(splitRequest);
            if (!splitResponse.hasPrimaryStream() || !splitResponse.hasRemainderStream()) {
                Metrics.counter(BigQueryStorageStreamReader.class, (String)"split-at-fraction-calls-failed-due-to-impossible-split-point").inc();
                LOG.info("BigQuery Storage API stream {} cannot be split at {}.", (Object)((BigQueryStorageStreamSource)this.source).readStream.getName(), (Object)fraction);
                return null;
            }
            BigQueryStorageStreamReader bigQueryStorageStreamReader = this;
            synchronized (bigQueryStorageStreamReader) {
                Iterator newResponseIterator;
                BigQueryServices.BigQueryServerStream<ReadRowsResponse> newResponseStream;
                try {
                    newResponseStream = this.storageClient.readRows(ReadRowsRequest.newBuilder().setReadStream(splitResponse.getPrimaryStream().getName()).setOffset(this.currentOffset + 1L).build());
                    newResponseIterator = newResponseStream.iterator();
                    newResponseIterator.hasNext();
                }
                catch (FailedPreconditionException e) {
                    Metrics.counter(BigQueryStorageStreamReader.class, (String)"split-at-fraction-calls-failed-due-to-bad-split-point").inc();
                    LOG.info("BigQuery Storage API split of stream {} abandoned because the primary stream is to the left of the split fraction {}.", (Object)((BigQueryStorageStreamSource)this.source).readStream.getName(), (Object)fraction);
                    return null;
                }
                catch (Exception e) {
                    Metrics.counter(BigQueryStorageStreamReader.class, (String)"split-at-fraction-calls-failed-due-to-other-reasons").inc();
                    LOG.error("BigQuery Storage API stream split failed.", (Throwable)e);
                    return null;
                }
                this.responseStream.cancel();
                this.source = this.source.fromExisting(splitResponse.getPrimaryStream());
                this.responseStream = newResponseStream;
                this.responseIterator = newResponseIterator;
                this.decoder = null;
            }
            Metrics.counter(BigQueryStorageStreamReader.class, (String)"split-at-fraction-calls-successful").inc();
            LOG.info("Successfully split BigQuery Storage API stream at {}. Split response: {}", (Object)fraction, (Object)splitResponse);
            return this.source.fromExisting(splitResponse.getRemainderStream());
        }

        public synchronized @UnknownKeyFor @NonNull @Initialized Double getFractionConsumed() {
            return this.fractionConsumed;
        }
    }
}

