/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.FailedPreconditionException;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadStream;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageReader;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageReaderFactory;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BigQueryStorageStreamSource<T>
extends BoundedSource<T> {
    private static final Logger LOG = LoggerFactory.getLogger(BigQueryStorageStreamSource.class);
    private final ReadSession readSession;
    private final ReadStream readStream;
    private final String jsonTableSchema;
    private final SerializableFunction<SchemaAndRecord, T> parseFn;
    private final Coder<T> outputCoder;
    private final BigQueryServices bqServices;

    public static <T> BigQueryStorageStreamSource<T> create(ReadSession readSession, ReadStream readStream, TableSchema tableSchema, SerializableFunction<SchemaAndRecord, T> parseFn, Coder<T> outputCoder, BigQueryServices bqServices) {
        return new BigQueryStorageStreamSource<T>(readSession, readStream, BigQueryHelpers.toJsonString(Preconditions.checkNotNull((Object)tableSchema, (Object)"tableSchema")), parseFn, outputCoder, bqServices);
    }

    public BigQueryStorageStreamSource<T> fromExisting(ReadStream newReadStream) {
        return new BigQueryStorageStreamSource<T>(this.readSession, newReadStream, this.jsonTableSchema, this.parseFn, this.outputCoder, this.bqServices);
    }

    private BigQueryStorageStreamSource(ReadSession readSession, ReadStream readStream, String jsonTableSchema, SerializableFunction<SchemaAndRecord, T> parseFn, Coder<T> outputCoder, BigQueryServices bqServices) {
        this.readSession = (ReadSession)Preconditions.checkNotNull((Object)readSession, (Object)"readSession");
        this.readStream = (ReadStream)Preconditions.checkNotNull((Object)readStream, (Object)"stream");
        this.jsonTableSchema = (String)Preconditions.checkNotNull((Object)jsonTableSchema, (Object)"jsonTableSchema");
        this.parseFn = (SerializableFunction)Preconditions.checkNotNull(parseFn, (Object)"parseFn");
        this.outputCoder = (Coder)Preconditions.checkNotNull(outputCoder, (Object)"outputCoder");
        this.bqServices = (BigQueryServices)Preconditions.checkNotNull((Object)bqServices, (Object)"bqServices");
    }

    public Coder<T> getOutputCoder() {
        return this.outputCoder;
    }

    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        builder.add(DisplayData.item((String)"table", (String)this.readSession.getTable()).withLabel("Table")).add(DisplayData.item((String)"readSession", (String)this.readSession.getName()).withLabel("Read session")).add(DisplayData.item((String)"stream", (String)this.readStream.getName()).withLabel("Stream"));
    }

    public long getEstimatedSizeBytes(PipelineOptions options) {
        return 0L;
    }

    public List<? extends BoundedSource<T>> split(long desiredBundleSizeBytes, PipelineOptions options) {
        return ImmutableList.of((Object)((Object)this));
    }

    public BigQueryStorageStreamReader<T> createReader(PipelineOptions options) throws IOException {
        return new BigQueryStorageStreamReader(this, (BigQueryOptions)options.as(BigQueryOptions.class));
    }

    public String toString() {
        return this.readStream.toString();
    }

    public static class BigQueryStorageStreamReader<T>
    extends BoundedSource.BoundedReader<T> {
        private final BigQueryStorageReader reader;
        private final SerializableFunction<SchemaAndRecord, T> parseFn;
        private final BigQueryServices.StorageClient storageClient;
        private final TableSchema tableSchema;
        private BigQueryStorageStreamSource<T> source;
        private BigQueryServices.BigQueryServerStream<ReadRowsResponse> responseStream;
        private Iterator<ReadRowsResponse> responseIterator;
        private T current;
        private long currentOffset;
        private boolean splitPossible = true;
        private double fractionConsumed;
        private double progressAtResponseStart;
        private double progressAtResponseEnd;
        private long rowsConsumedFromCurrentResponse;
        private long totalRowsInCurrentResponse;
        private TableReference tableReference;
        private ServiceCallMetric serviceCallMetric;

        private BigQueryStorageStreamReader(BigQueryStorageStreamSource<T> source, BigQueryOptions options) throws IOException {
            this.source = source;
            this.reader = BigQueryStorageReaderFactory.getReader(((BigQueryStorageStreamSource)source).readSession);
            this.parseFn = ((BigQueryStorageStreamSource)source).parseFn;
            this.storageClient = ((BigQueryStorageStreamSource)source).bqServices.getStorageClient(options);
            this.tableSchema = BigQueryHelpers.fromJsonString(((BigQueryStorageStreamSource)source).jsonTableSchema, TableSchema.class);
            this.fractionConsumed = 0.0;
            this.progressAtResponseStart = 0.0;
            this.progressAtResponseEnd = 0.0;
            this.rowsConsumedFromCurrentResponse = 0L;
            this.totalRowsInCurrentResponse = 0L;
        }

        public synchronized boolean start() throws IOException {
            BoundedSource source = this.getCurrentSource();
            ReadRowsRequest request = ReadRowsRequest.newBuilder().setReadStream(((BigQueryStorageStreamSource)source).readStream.getName()).setOffset(this.currentOffset).build();
            this.tableReference = BigQueryUtils.toTableReference(((BigQueryStorageStreamSource)source).readSession.getTable());
            this.serviceCallMetric = BigQueryUtils.readCallMetric(this.tableReference);
            this.responseStream = this.storageClient.readRows(request, ((BigQueryStorageStreamSource)source).readSession.getTable());
            this.responseIterator = this.responseStream.iterator();
            LOG.info("Started BigQuery Storage API read from stream {}.", (Object)((BigQueryStorageStreamSource)source).readStream.getName());
            return this.readNextRecord();
        }

        public synchronized boolean advance() throws IOException {
            ++this.currentOffset;
            return this.readNextRecord();
        }

        private synchronized boolean readNextRecord() throws IOException {
            while (this.reader.readyForNextReadResponse()) {
                ReadRowsResponse response;
                if (!this.responseIterator.hasNext()) {
                    this.fractionConsumed = 1.0;
                    return false;
                }
                try {
                    response = this.responseIterator.next();
                    if (this.serviceCallMetric != null) {
                        this.serviceCallMetric.call("ok");
                    }
                }
                catch (ApiException e) {
                    if (this.serviceCallMetric != null) {
                        this.serviceCallMetric.call(e.getStatusCode().getCode().name());
                    }
                    throw e;
                }
                this.progressAtResponseStart = response.getStats().getProgress().getAtResponseStart();
                this.progressAtResponseEnd = response.getStats().getProgress().getAtResponseEnd();
                this.totalRowsInCurrentResponse = response.getRowCount();
                this.rowsConsumedFromCurrentResponse = 0L;
                Preconditions.checkArgument((this.totalRowsInCurrentResponse >= 0L ? 1 : 0) != 0, (String)"Row count from current response (%s) must be non-negative.", (long)this.totalRowsInCurrentResponse);
                Preconditions.checkArgument((0.0 <= this.progressAtResponseStart && this.progressAtResponseStart <= 1.0 ? 1 : 0) != 0, (String)"Progress at response start (%s) is not in the range [0.0, 1.0].", (Object)this.progressAtResponseStart);
                Preconditions.checkArgument((0.0 <= this.progressAtResponseEnd && this.progressAtResponseEnd <= 1.0 ? 1 : 0) != 0, (String)"Progress at response end (%s) is not in the range [0.0, 1.0].", (Object)this.progressAtResponseEnd);
                this.reader.processReadRowsResponse(response);
            }
            SchemaAndRecord schemaAndRecord = new SchemaAndRecord(this.reader.readSingleRecord(), this.tableSchema);
            this.current = this.parseFn.apply((Object)schemaAndRecord);
            ++this.rowsConsumedFromCurrentResponse;
            this.fractionConsumed = this.progressAtResponseStart + (this.progressAtResponseEnd - this.progressAtResponseStart) * (double)this.rowsConsumedFromCurrentResponse * 1.0 / (double)this.totalRowsInCurrentResponse;
            return true;
        }

        public T getCurrent() throws NoSuchElementException {
            return this.current;
        }

        public synchronized void close() {
            this.storageClient.close();
            this.reader.close();
        }

        public synchronized BigQueryStorageStreamSource<T> getCurrentSource() {
            return this.source;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public BoundedSource<T> splitAtFraction(double fraction) {
            Metrics.counter(BigQueryStorageStreamReader.class, (String)"split-at-fraction-calls").inc();
            LOG.debug("Received BigQuery Storage API split request for stream {} at fraction {}.", (Object)((BigQueryStorageStreamSource)this.source).readStream.getName(), (Object)fraction);
            if (fraction <= 0.0 || fraction >= 1.0) {
                LOG.info("BigQuery Storage API does not support splitting at fraction {}", (Object)fraction);
                return null;
            }
            if (!this.splitPossible) {
                return null;
            }
            SplitReadStreamRequest splitRequest = SplitReadStreamRequest.newBuilder().setName(((BigQueryStorageStreamSource)this.source).readStream.getName()).setFraction((double)((float)fraction)).build();
            SplitReadStreamResponse splitResponse = this.storageClient.splitReadStream(splitRequest);
            if (!splitResponse.hasPrimaryStream() || !splitResponse.hasRemainderStream()) {
                Metrics.counter(BigQueryStorageStreamReader.class, (String)"split-at-fraction-calls-failed-due-to-impossible-split-point").inc();
                LOG.info("BigQuery Storage API stream {} cannot be split at {}.", (Object)((BigQueryStorageStreamSource)this.source).readStream.getName(), (Object)fraction);
                this.splitPossible = false;
                return null;
            }
            BigQueryStorageStreamReader bigQueryStorageStreamReader = this;
            synchronized (bigQueryStorageStreamReader) {
                Iterator newResponseIterator;
                BigQueryServices.BigQueryServerStream<ReadRowsResponse> newResponseStream;
                try {
                    newResponseStream = this.storageClient.readRows(ReadRowsRequest.newBuilder().setReadStream(splitResponse.getPrimaryStream().getName()).setOffset(this.currentOffset + 1L).build(), ((BigQueryStorageStreamSource)this.source).readSession.getTable());
                    newResponseIterator = newResponseStream.iterator();
                    newResponseIterator.hasNext();
                }
                catch (FailedPreconditionException e) {
                    Metrics.counter(BigQueryStorageStreamReader.class, (String)"split-at-fraction-calls-failed-due-to-bad-split-point").inc();
                    LOG.info("BigQuery Storage API split of stream {} abandoned because the primary stream is to the left of the split fraction {}.", (Object)((BigQueryStorageStreamSource)this.source).readStream.getName(), (Object)fraction);
                    return null;
                }
                catch (Exception e) {
                    Metrics.counter(BigQueryStorageStreamReader.class, (String)"split-at-fraction-calls-failed-due-to-other-reasons").inc();
                    LOG.error("BigQuery Storage API stream split failed.", (Throwable)e);
                    return null;
                }
                this.responseStream.cancel();
                this.source = this.source.fromExisting(splitResponse.getPrimaryStream());
                this.responseStream = newResponseStream;
                this.responseIterator = newResponseIterator;
                this.reader.resetBuffer();
            }
            Metrics.counter(BigQueryStorageStreamReader.class, (String)"split-at-fraction-calls-successful").inc();
            LOG.info("Successfully split BigQuery Storage API stream at {}. Split response: {}", (Object)fraction, (Object)splitResponse);
            return this.source.fromExisting(splitResponse.getRemainderStream());
        }

        public synchronized Double getFractionConsumed() {
            return this.fractionConsumed;
        }
    }
}

