/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1beta1.Storage;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.OffsetBasedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class BigQueryStorageStreamSource<T>
extends OffsetBasedSource<T> {
    private final Storage.ReadSession readSession;
    private final Storage.Stream stream;
    private final String jsonTableSchema;
    private final SerializableFunction<SchemaAndRecord, T> parseFn;
    private final Coder<T> outputCoder;
    private final BigQueryServices bqServices;

    public static <T> BigQueryStorageStreamSource<T> create(Storage.ReadSession readSession, Storage.Stream stream, TableSchema tableSchema, SerializableFunction<SchemaAndRecord, T> parseFn, Coder<T> outputCoder, BigQueryServices bqServices) {
        return new BigQueryStorageStreamSource<T>(readSession, stream, 0L, Long.MAX_VALUE, 1L, BigQueryHelpers.toJsonString(Preconditions.checkNotNull((Object)tableSchema, (Object)"tableSchema")), parseFn, outputCoder, bqServices);
    }

    private BigQueryStorageStreamSource(Storage.ReadSession readSession, Storage.Stream stream, long startOffset, long stopOffset, long minBundleSize, String jsonTableSchema, SerializableFunction<SchemaAndRecord, T> parseFn, Coder<T> outputCoder, BigQueryServices bqServices) {
        super(startOffset, stopOffset, minBundleSize);
        this.readSession = (Storage.ReadSession)Preconditions.checkNotNull((Object)readSession, (Object)"readSession");
        this.stream = (Storage.Stream)Preconditions.checkNotNull((Object)stream, (Object)"stream");
        this.jsonTableSchema = (String)Preconditions.checkNotNull((Object)jsonTableSchema, (Object)"jsonTableSchema");
        this.parseFn = (SerializableFunction)Preconditions.checkNotNull(parseFn, (Object)"parseFn");
        this.outputCoder = (Coder)Preconditions.checkNotNull(outputCoder, (Object)"outputCoder");
        this.bqServices = (BigQueryServices)Preconditions.checkNotNull((Object)bqServices, (Object)"bqServices");
    }

    public Coder<T> getOutputCoder() {
        return this.outputCoder;
    }

    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        builder.addIfNotNull(DisplayData.item((String)"table", (String)BigQueryHelpers.toTableSpec(this.readSession.getTableReference())).withLabel("Table")).add(DisplayData.item((String)"readSession", (String)this.readSession.getName()).withLabel("Read session")).add(DisplayData.item((String)"stream", (String)this.stream.getName()).withLabel("Stream"));
    }

    public long getEstimatedSizeBytes(PipelineOptions options) {
        return 0L;
    }

    public List<? extends OffsetBasedSource<T>> split(long desiredBundleSizeBytes, PipelineOptions options) {
        return ImmutableList.of((Object)((Object)this));
    }

    public long getMaxEndOffset(PipelineOptions options) {
        throw new UnsupportedOperationException("Not implemented");
    }

    public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
        throw new UnsupportedOperationException("Not implemented");
    }

    public BigQueryStorageStreamReader<T> createReader(PipelineOptions options) throws IOException {
        return new BigQueryStorageStreamReader(this, (BigQueryOptions)options.as(BigQueryOptions.class));
    }

    @Experimental(value=Experimental.Kind.SOURCE_SINK)
    public static class BigQueryStorageStreamReader<T>
    extends OffsetBasedSource.OffsetBasedReader<T> {
        private final DatumReader<GenericRecord> datumReader;
        private final SerializableFunction<SchemaAndRecord, T> parseFn;
        private final BigQueryServices.StorageClient storageClient;
        private final TableSchema tableSchema;
        private Iterator<Storage.ReadRowsResponse> responseIterator;
        private BinaryDecoder decoder;
        private GenericRecord record;
        private T current;
        private long currentOffset;

        private BigQueryStorageStreamReader(BigQueryStorageStreamSource<T> source, BigQueryOptions options) throws IOException {
            super(source);
            this.datumReader = new GenericDatumReader(new Schema.Parser().parse(((BigQueryStorageStreamSource)source).readSession.getAvroSchema().getSchema()));
            this.parseFn = ((BigQueryStorageStreamSource)source).parseFn;
            this.storageClient = ((BigQueryStorageStreamSource)source).bqServices.getStorageClient(options);
            this.tableSchema = BigQueryHelpers.fromJsonString(((BigQueryStorageStreamSource)source).jsonTableSchema, TableSchema.class);
        }

        protected boolean startImpl() throws IOException {
            OffsetBasedSource source = this.getCurrentSource();
            this.currentOffset = source.getStartOffset();
            Storage.ReadRowsRequest request = Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream(((BigQueryStorageStreamSource)source).stream).setOffset(this.currentOffset)).build();
            this.responseIterator = this.storageClient.readRows(request).iterator();
            return this.readNextRecord();
        }

        protected boolean advanceImpl() throws IOException {
            ++this.currentOffset;
            return this.readNextRecord();
        }

        private boolean readNextRecord() throws IOException {
            while (this.decoder == null || this.decoder.isEnd()) {
                if (!this.responseIterator.hasNext()) {
                    return false;
                }
                Storage.ReadRowsResponse nextResponse = this.responseIterator.next();
                this.decoder = DecoderFactory.get().binaryDecoder(nextResponse.getAvroRows().getSerializedBinaryRows().toByteArray(), this.decoder);
            }
            this.record = (GenericRecord)this.datumReader.read((Object)this.record, (Decoder)this.decoder);
            this.current = this.parseFn.apply((Object)new SchemaAndRecord(this.record, this.tableSchema));
            return true;
        }

        public T getCurrent() throws NoSuchElementException {
            return this.current;
        }

        protected long getCurrentOffset() throws NoSuchElementException {
            return this.currentOffset;
        }

        public void close() {
            this.storageClient.close();
        }

        public synchronized BigQueryStorageStreamSource<T> getCurrentSource() {
            return (BigQueryStorageStreamSource)super.getCurrentSource();
        }

        public boolean allowsDynamicSplitting() {
            return false;
        }
    }
}

