/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import io.grpc.Status;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.AppendClientInfo;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager;
import org.apache.beam.sdk.io.gcp.bigquery.SplittingIterable;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiFlushAndFinalizeDoFn;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritePayload;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto;
import org.apache.beam.sdk.io.gcp.bigquery.TwoLevelMessageConverterCache;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.SideEffectFree;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StorageApiWritesShardedRecords<@UnknownKeyFor DestinationT, @UnknownKeyFor ElementT>
extends PTransform<PCollection<KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>>>, PCollectionTuple> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(StorageApiWritesShardedRecords.class);
    private static final @UnknownKeyFor @NonNull @Initialized Duration DEFAULT_STREAM_IDLE_TIME = Duration.standardHours((long)1L);
    private final @UnknownKeyFor @NonNull @Initialized StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
    private final @UnknownKeyFor @NonNull @Initialized BigQueryIO.Write.CreateDisposition createDisposition;
    private final @UnknownKeyFor @NonNull @Initialized String kmsKey;
    private final @UnknownKeyFor @NonNull @Initialized BigQueryServices bqServices;
    private final @UnknownKeyFor @NonNull @Initialized Coder<DestinationT> destinationCoder;
    private final @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized BigQueryStorageApiInsertError> failedRowsCoder;
    private final @UnknownKeyFor @NonNull @Initialized Duration streamIdleTime = DEFAULT_STREAM_IDLE_TIME;
    private final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized BigQueryStorageApiInsertError> failedRowsTag;
    private final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized StorageApiFlushAndFinalizeDoFn.Operation>> flushTag = new TupleTag("flushTag");
    private static final @UnknownKeyFor @NonNull @Initialized ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();
    private static final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @NonNull @Initialized ShardedKey<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized AppendClientInfo> APPEND_CLIENTS = CacheBuilder.newBuilder().expireAfterAccess(5L, TimeUnit.MINUTES).removalListener(removal -> {
        @Nullable AppendClientInfo appendClientInfo = (AppendClientInfo)removal.getValue();
        if (appendClientInfo != null) {
            appendClientInfo.close();
        }
    }).build();

    static void clearCache() {
        APPEND_CLIENTS.invalidateAll();
    }

    private static void runAsyncIgnoreFailure(@UnknownKeyFor @NonNull @Initialized ExecutorService executor, @UnknownKeyFor @NonNull @Initialized ThrowingRunnable task) {
        executor.submit(() -> {
            try {
                task.run();
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
    }

    public StorageApiWritesShardedRecords(@UnknownKeyFor @NonNull @Initialized StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations, @UnknownKeyFor @NonNull @Initialized BigQueryIO.Write.CreateDisposition createDisposition, @UnknownKeyFor @NonNull @Initialized String kmsKey, @UnknownKeyFor @NonNull @Initialized BigQueryServices bqServices, @UnknownKeyFor @NonNull @Initialized Coder<DestinationT> destinationCoder, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized BigQueryStorageApiInsertError> failedRowsCoder, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized BigQueryStorageApiInsertError> failedRowsTag) {
        this.dynamicDestinations = dynamicDestinations;
        this.createDisposition = createDisposition;
        this.kmsKey = kmsKey;
        this.bqServices = bqServices;
        this.destinationCoder = destinationCoder;
        this.failedRowsCoder = failedRowsCoder;
        this.failedRowsTag = failedRowsTag;
    }

    public @UnknownKeyFor @NonNull @Initialized PCollectionTuple expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized ShardedKey<DestinationT>, @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized StorageApiWritePayload>>> input) {
        SchemaCoder operationCoder;
        BigQueryOptions bigQueryOptions = (BigQueryOptions)input.getPipeline().getOptions().as(BigQueryOptions.class);
        long splitSize = bigQueryOptions.getStorageApiAppendThresholdBytes().intValue();
        long maxRequestSize = bigQueryOptions.getStorageWriteApiMaxRequestSize();
        String operationName = input.getName() + "/" + this.getName();
        PCollectionTuple writeRecordsResult = (PCollectionTuple)input.apply("Write Records", (PTransform)ParDo.of((DoFn)new WriteRecordsDoFn(operationName, this.streamIdleTime, splitSize, maxRequestSize)).withSideInputs(this.dynamicDestinations.getSideInputs()).withOutputTags(this.flushTag, TupleTagList.of(this.failedRowsTag)));
        try {
            SchemaRegistry schemaRegistry = input.getPipeline().getSchemaRegistry();
            operationCoder = SchemaCoder.of((Schema)schemaRegistry.getSchema(StorageApiFlushAndFinalizeDoFn.Operation.class), (TypeDescriptor)TypeDescriptor.of(StorageApiFlushAndFinalizeDoFn.Operation.class), (SerializableFunction)schemaRegistry.getToRowFunction(StorageApiFlushAndFinalizeDoFn.Operation.class), (SerializableFunction)schemaRegistry.getFromRowFunction(StorageApiFlushAndFinalizeDoFn.Operation.class));
        }
        catch (NoSuchSchemaException e) {
            throw new RuntimeException(e);
        }
        ((PCollection)((PCollection)writeRecordsResult.get(this.flushTag).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)operationCoder)).apply((PTransform)Window.configure().triggering((Trigger)Repeatedly.forever((Trigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds((long)1L)))).discardingFiredPanes())).apply("maxFlushPosition", (PTransform)Combine.perKey((CombineFnBase.GlobalCombineFn)Max.naturalOrder((Comparable)new StorageApiFlushAndFinalizeDoFn.Operation(-1L, false))))).apply("Flush and finalize writes", (PTransform)ParDo.of((DoFn)new StorageApiFlushAndFinalizeDoFn(this.bqServices)));
        writeRecordsResult.get(this.failedRowsTag).setCoder(this.failedRowsCoder);
        return writeRecordsResult;
    }

    class WriteRecordsDoFn
    extends DoFn<KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>>, KV<String, StorageApiFlushAndFinalizeDoFn.Operation>> {
        private final @UnknownKeyFor @NonNull @Initialized Counter recordsAppended = Metrics.counter(WriteRecordsDoFn.class, (String)"recordsAppended");
        private final @UnknownKeyFor @NonNull @Initialized Counter streamsCreated = Metrics.counter(WriteRecordsDoFn.class, (String)"streamsCreated");
        private final @UnknownKeyFor @NonNull @Initialized Counter streamsIdle = Metrics.counter(WriteRecordsDoFn.class, (String)"idleStreamsFinalized");
        private final @UnknownKeyFor @NonNull @Initialized Counter appendFailures = Metrics.counter(WriteRecordsDoFn.class, (String)"appendFailures");
        private final @UnknownKeyFor @NonNull @Initialized Counter appendOffsetFailures = Metrics.counter(WriteRecordsDoFn.class, (String)"appendOffsetFailures");
        private final @UnknownKeyFor @NonNull @Initialized Counter flushesScheduled = Metrics.counter(WriteRecordsDoFn.class, (String)"flushesScheduled");
        private final @UnknownKeyFor @NonNull @Initialized Distribution appendLatencyDistribution = Metrics.distribution(WriteRecordsDoFn.class, (String)"appendLatencyDistributionMs");
        private final @UnknownKeyFor @NonNull @Initialized Distribution appendSizeDistribution = Metrics.distribution(WriteRecordsDoFn.class, (String)"appendSizeDistribution");
        private final @UnknownKeyFor @NonNull @Initialized Distribution appendSplitDistribution = Metrics.distribution(WriteRecordsDoFn.class, (String)"appendSplitDistribution");
        private final @UnknownKeyFor @NonNull @Initialized Counter rowsSentToFailedRowsCollection = Metrics.counter(WriteRecordsDoFn.class, (String)"rowsSentToFailedRowsCollection");
        private @UnknownKeyFor @NonNull @Initialized TwoLevelMessageConverterCache<DestinationT, ElementT> messageConverters;
        private @UnknownKeyFor @NonNull @Initialized Map<DestinationT, @UnknownKeyFor @NonNull @Initialized TableDestination> destinations = Maps.newHashMap();
        private transient @Nullable @UnknownKeyFor @Initialized BigQueryServices.DatasetService datasetServiceInternal = null;
        @DoFn.StateId(value="streamName")
        private final @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized String>> streamNameSpec = StateSpecs.value();
        @DoFn.StateId(value="streamOffset")
        private final @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized Long>> streamOffsetSpec = StateSpecs.value();
        @DoFn.TimerId(value="idleTimer")
        private final @UnknownKeyFor @NonNull @Initialized TimerSpec idleTimer = TimerSpecs.timer((TimeDomain)TimeDomain.PROCESSING_TIME);
        private final @UnknownKeyFor @NonNull @Initialized Duration streamIdleTime;
        private final @UnknownKeyFor @NonNull @Initialized long splitSize;
        private final @UnknownKeyFor @NonNull @Initialized long maxRequestSize;

        public WriteRecordsDoFn(@UnknownKeyFor @NonNull @Initialized String operationName, @UnknownKeyFor @NonNull @Initialized Duration streamIdleTime, @UnknownKeyFor @NonNull @Initialized long splitSize, long maxRequestSize) {
            this.messageConverters = new TwoLevelMessageConverterCache(operationName);
            this.streamIdleTime = streamIdleTime;
            this.splitSize = splitSize;
            this.maxRequestSize = maxRequestSize;
        }

        @DoFn.StartBundle
        public void startBundle() throws @UnknownKeyFor @NonNull @Initialized IOException {
            this.destinations = Maps.newHashMap();
        }

        @UnknownKeyFor @NonNull @Initialized String getOrCreateStream(@UnknownKeyFor @NonNull @Initialized String tableId, @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized String> streamName, @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized Long> streamOffset, @UnknownKeyFor @NonNull @Initialized Timer streamIdleTimer, @UnknownKeyFor @NonNull @Initialized BigQueryServices.DatasetService datasetService) {
            try {
                String stream = (String)streamName.read();
                if (Strings.isNullOrEmpty((String)stream)) {
                    stream = datasetService.createWriteStream(tableId, WriteStream.Type.BUFFERED).getName();
                    streamName.write((Object)stream);
                    streamOffset.write((Object)0L);
                    this.streamsCreated.inc();
                }
                streamIdleTimer.offset(this.streamIdleTime).withNoOutputTimestamp().setRelative();
                return stream;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private @UnknownKeyFor @NonNull @Initialized BigQueryServices.DatasetService getDatasetService(@UnknownKeyFor @NonNull @Initialized PipelineOptions pipelineOptions) throws @UnknownKeyFor @NonNull @Initialized IOException {
            if (this.datasetServiceInternal == null) {
                this.datasetServiceInternal = StorageApiWritesShardedRecords.this.bqServices.getDatasetService((BigQueryOptions)pipelineOptions.as(BigQueryOptions.class));
            }
            return this.datasetServiceInternal;
        }

        @DoFn.Teardown
        public void onTeardown() {
            try {
                if (this.datasetServiceInternal != null) {
                    this.datasetServiceInternal.close();
                    this.datasetServiceInternal = null;
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @DoFn.ProcessElement
        public void process(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c, @UnknownKeyFor @NonNull @Initialized PipelineOptions pipelineOptions, @DoFn.Element @UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized ShardedKey<DestinationT>, @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized StorageApiWritePayload>> element, @DoFn.AlwaysFetched @DoFn.StateId(value="streamName") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized String> streamName, @DoFn.AlwaysFetched @DoFn.StateId(value="streamOffset") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized Long> streamOffset, @DoFn.TimerId(value="idleTimer") @UnknownKeyFor @NonNull @Initialized Timer idleTimer, // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DoFn.MultiOutputReceiver o) throws @UnknownKeyFor @NonNull @Initialized Exception {
            block8: {
                BigQueryOptions bigQueryOptions = (BigQueryOptions)pipelineOptions.as(BigQueryOptions.class);
                StorageApiWritesShardedRecords.this.dynamicDestinations.setSideInputAccessorFromProcessContext(c);
                TableDestination tableDestination = this.destinations.computeIfAbsent(((ShardedKey)element.getKey()).getKey(), dest -> {
                    TableDestination tableDestination1 = StorageApiWritesShardedRecords.this.dynamicDestinations.getTable(dest);
                    Preconditions.checkArgument((tableDestination1 != null ? 1 : 0) != 0, (String)"DynamicDestinations.getTable() may not return null, but %s returned null for destination %s", (Object)StorageApiWritesShardedRecords.this.dynamicDestinations, (Object)dest);
                    return tableDestination1;
                });
                String tableId = tableDestination.getTableUrn();
                BigQueryServices.DatasetService datasetService = this.getDatasetService(pipelineOptions);
                Supplier<String> getOrCreateStream = () -> this.getOrCreateStream(tableId, streamName, streamOffset, idleTimer, datasetService);
                AppendClientInfo appendClientInfo = (AppendClientInfo)APPEND_CLIENTS.get((Object)((ShardedKey)element.getKey()), () -> {
                    @Nullable TableSchema tableSchema = this.messageConverters.get(((ShardedKey)element.getKey()).getKey(), StorageApiWritesShardedRecords.this.dynamicDestinations, datasetService).getTableSchema();
                    return new AppendClientInfo(tableSchema, client -> StorageApiWritesShardedRecords.runAsyncIgnoreFailure(closeWriterExecutor, client::close)).createAppendClient(datasetService, getOrCreateStream, false);
                });
                SplittingIterable messages = new SplittingIterable((Iterable)element.getValue(), this.splitSize);
                BiConsumer<Iterable, Boolean> initializeContexts = (contexts, isFailure) -> {
                    try {
                        if (isFailure.booleanValue()) {
                            streamName.write((Object)"");
                        }
                        appendClientInfo.createAppendClient(datasetService, getOrCreateStream, false);
                        BigQueryServices.StreamAppendClient streamAppendClient = (BigQueryServices.StreamAppendClient)org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull((Object)appendClientInfo.streamAppendClient);
                        for (AppendRowsContext context : contexts) {
                            context.streamName = (String)streamName.read();
                            streamAppendClient.pin();
                            context.client = appendClientInfo.streamAppendClient;
                            context.offset = (Long)streamOffset.read();
                            ++context.tryIteration;
                            streamOffset.write((Object)(context.offset + (long)context.protoRows.getSerializedRowsCount()));
                        }
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                };
                Consumer<Iterable> clearClients = contexts -> {
                    appendClientInfo.clearAppendClient();
                    for (AppendRowsContext context : contexts) {
                        if (context.client == null) continue;
                        StorageApiWritesShardedRecords.runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin);
                        context.client = null;
                    }
                };
                Function<AppendRowsContext, ApiFuture> runOperation = context -> {
                    if (context.protoRows.getSerializedRowsCount() == 0) {
                        return ApiFutures.immediateFuture((Object)AppendRowsResponse.newBuilder().build());
                    }
                    try {
                        appendClientInfo.createAppendClient(datasetService, getOrCreateStream, false);
                        return ((BigQueryServices.StreamAppendClient)org.apache.beam.sdk.util.Preconditions.checkStateNotNull((Object)appendClientInfo.streamAppendClient)).appendRows(context.offset, context.protoRows);
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                };
                Function onError = failedContexts -> {
                    boolean streamDoesNotExist;
                    AppendRowsContext failedContext = (AppendRowsContext)org.apache.beam.sdk.util.Preconditions.checkStateNotNull((Object)((AppendRowsContext)Iterables.getFirst((Iterable)failedContexts, null)));
                    if (failedContext.getError() != null && failedContext.getError() instanceof Exceptions.AppendSerializtionError) {
                        Exceptions.AppendSerializtionError error = (Exceptions.AppendSerializtionError)org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull((Object)((Exceptions.AppendSerializtionError)failedContext.getError()));
                        Set failedRowIndices = error.getRowIndexToErrorMessage().keySet();
                        Iterator iterator = failedRowIndices.iterator();
                        while (iterator.hasNext()) {
                            int failedIndex = (Integer)iterator.next();
                            ByteString protoBytes = failedContext.protoRows.getSerializedRows(failedIndex);
                            try {
                                TableRow failedRow = TableRowToStorageApiProto.tableRowFromMessage((Message)DynamicMessage.parseFrom((Descriptors.Descriptor)appendClientInfo.descriptor, (ByteString)protoBytes));
                                new BigQueryStorageApiInsertError(failedRow, (String)error.getRowIndexToErrorMessage().get(failedIndex));
                                o.get(StorageApiWritesShardedRecords.this.failedRowsTag).output((Object)new BigQueryStorageApiInsertError(failedRow, (String)error.getRowIndexToErrorMessage().get(failedIndex)));
                            }
                            catch (InvalidProtocolBufferException e) {
                                LOG.error("Failed to insert row and could not parse the result!");
                            }
                        }
                        this.rowsSentToFailedRowsCollection.inc((long)failedRowIndices.size());
                        ProtoRows.Builder retryRows = ProtoRows.newBuilder();
                        for (int i = 0; i < failedContext.protoRows.getSerializedRowsCount(); ++i) {
                            if (failedRowIndices.contains(i)) continue;
                            ByteString rowBytes = failedContext.protoRows.getSerializedRows(i);
                            retryRows.addSerializedRows(rowBytes);
                        }
                        failedContext.protoRows = retryRows.build();
                        long offset = failedContext.offset;
                        for (AppendRowsContext context : failedContexts) {
                            context.offset = offset;
                            offset += (long)context.protoRows.getSerializedRowsCount();
                        }
                        streamOffset.write((Object)offset);
                        return RetryManager.RetryType.RETRY_ALL_OPERATIONS;
                    }
                    LOG.error("Got error " + failedContext.getError() + " closing " + failedContext.streamName);
                    clearClients.accept((Iterable)failedContexts);
                    this.appendFailures.inc();
                    boolean explicitStreamFinalized = failedContext.getError() instanceof Exceptions.StreamFinalizedException;
                    Throwable error = (Throwable)org.apache.beam.sdk.util.Preconditions.checkStateNotNull((Object)failedContext.getError());
                    Status.Code statusCode = Status.fromThrowable((Throwable)error).getCode();
                    boolean offsetMismatch = statusCode.equals((Object)Status.Code.OUT_OF_RANGE) || statusCode.equals((Object)Status.Code.ALREADY_EXISTS);
                    boolean bl = streamDoesNotExist = explicitStreamFinalized || statusCode.equals((Object)Status.Code.INVALID_ARGUMENT) || statusCode.equals((Object)Status.Code.NOT_FOUND) || statusCode.equals((Object)Status.Code.FAILED_PRECONDITION);
                    if (offsetMismatch || streamDoesNotExist) {
                        this.appendOffsetFailures.inc();
                        LOG.warn("Append to " + failedContext + " failed with " + failedContext.getError() + " Will retry with a new stream");
                        o.get(StorageApiWritesShardedRecords.this.flushTag).output((Object)KV.of((Object)failedContext.streamName, (Object)new StorageApiFlushAndFinalizeDoFn.Operation(failedContext.offset - 1L, true)));
                        initializeContexts.accept((Iterable)failedContexts, true);
                        return RetryManager.RetryType.RETRY_ALL_OPERATIONS;
                    }
                    return RetryManager.RetryType.RETRY_ALL_OPERATIONS;
                };
                Consumer<AppendRowsContext> onSuccess = context -> {
                    o.get(StorageApiWritesShardedRecords.this.flushTag).output((Object)KV.of((Object)context.streamName, (Object)new StorageApiFlushAndFinalizeDoFn.Operation(context.offset + (long)context.protoRows.getSerializedRowsCount() - 1L, false)));
                    this.flushesScheduled.inc((long)context.protoRows.getSerializedRowsCount());
                };
                Instant now = Instant.now();
                ArrayList contexts2 = Lists.newArrayList();
                RetryManager retryManager = new RetryManager(Duration.standardSeconds((long)1L), Duration.standardSeconds((long)10L), 1000);
                int numAppends = 0;
                for (ProtoRows protoRows : messages) {
                    if ((long)protoRows.getSerializedSize() >= this.maxRequestSize) {
                        if (protoRows.getSerializedRowsCount() > 1) {
                            LOG.error("A request containing more than one row is over the request size limit of " + this.maxRequestSize + ". This is unexpected. All rows in the request will be sent to the failed-rows PCollection.");
                        }
                        for (ByteString rowBytes : protoRows.getSerializedRowsList()) {
                            TableRow failedRow = TableRowToStorageApiProto.tableRowFromMessage((Message)DynamicMessage.parseFrom((Descriptors.Descriptor)appendClientInfo.descriptor, (ByteString)rowBytes));
                            o.get(StorageApiWritesShardedRecords.this.failedRowsTag).output((Object)new BigQueryStorageApiInsertError(failedRow, "Row payload too large. Maximum size " + this.maxRequestSize));
                        }
                        continue;
                    }
                    ++numAppends;
                    AppendRowsContext context2 = new AppendRowsContext((ShardedKey)element.getKey(), protoRows);
                    contexts2.add(context2);
                    retryManager.addOperation(runOperation, onError, onSuccess, context2);
                    this.recordsAppended.inc((long)protoRows.getSerializedRowsCount());
                    this.appendSizeDistribution.update((long)context2.protoRows.getSerializedRowsCount());
                }
                if (numAppends <= 0) break block8;
                initializeContexts.accept(contexts2, false);
                try {
                    retryManager.run(true);
                }
                catch (Throwable throwable) {
                    for (AppendRowsContext context3 : contexts2) {
                        if (context3.client == null) continue;
                        StorageApiWritesShardedRecords.runAsyncIgnoreFailure(closeWriterExecutor, context3.client::unpin);
                    }
                    throw throwable;
                }
                for (AppendRowsContext context4 : contexts2) {
                    if (context4.client == null) continue;
                    StorageApiWritesShardedRecords.runAsyncIgnoreFailure(closeWriterExecutor, context4.client::unpin);
                }
                this.appendSplitDistribution.update((long)numAppends);
                java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now());
                this.appendLatencyDistribution.update(timeElapsed.toMillis());
            }
            idleTimer.offset(this.streamIdleTime).withNoOutputTimestamp().setRelative();
        }

        private void finalizeStream(@DoFn.AlwaysFetched @DoFn.StateId(value="streamName") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized String> streamName, @DoFn.AlwaysFetched @DoFn.StateId(value="streamOffset") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized Long> streamOffset, @UnknownKeyFor @NonNull @Initialized ShardedKey<DestinationT> key, // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DoFn.MultiOutputReceiver o, @UnknownKeyFor @NonNull @Initialized org.joda.time.Instant finalizeElementTs) {
            String stream = (String)MoreObjects.firstNonNull((Object)((String)streamName.read()), (Object)"");
            if (!Strings.isNullOrEmpty((String)stream)) {
                long nextOffset = (Long)MoreObjects.firstNonNull((Object)((Long)streamOffset.read()), (Object)0L);
                o.get(StorageApiWritesShardedRecords.this.flushTag).outputWithTimestamp((Object)KV.of((Object)stream, (Object)new StorageApiFlushAndFinalizeDoFn.Operation(nextOffset - 1L, true)), finalizeElementTs);
                streamName.clear();
                streamOffset.clear();
                APPEND_CLIENTS.invalidate(key);
            }
        }

        @DoFn.OnTimer(value="idleTimer")
        public void onTimer(@DoFn.Key @UnknownKeyFor @NonNull @Initialized ShardedKey<DestinationT> key, @DoFn.AlwaysFetched @DoFn.StateId(value="streamName") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized String> streamName, @DoFn.AlwaysFetched @DoFn.StateId(value="streamOffset") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized Long> streamOffset, // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DoFn.MultiOutputReceiver o, @UnknownKeyFor @NonNull @Initialized BoundedWindow window) {
            this.finalizeStream(streamName, streamOffset, key, o, window.maxTimestamp());
            this.streamsIdle.inc();
        }

        @DoFn.OnWindowExpiration
        public void onWindowExpiration(@DoFn.Key @UnknownKeyFor @NonNull @Initialized ShardedKey<DestinationT> key, @DoFn.AlwaysFetched @DoFn.StateId(value="streamName") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized String> streamName, @DoFn.AlwaysFetched @DoFn.StateId(value="streamOffset") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized Long> streamOffset, // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DoFn.MultiOutputReceiver o, @UnknownKeyFor @NonNull @Initialized BoundedWindow window) {
            this.finalizeStream(streamName, streamOffset, key, o, window.maxTimestamp());
        }

        public @UnknownKeyFor @NonNull @Initialized Duration getAllowedTimestampSkew() {
            return Duration.millis((long)BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
        }
    }

    private static interface ThrowingRunnable {
        public void run() throws @UnknownKeyFor @NonNull @Initialized Exception;
    }

    class AppendRowsContext
    extends RetryManager.Operation.Context<AppendRowsResponse> {
        final @UnknownKeyFor @NonNull @Initialized ShardedKey<DestinationT> key;
        @UnknownKeyFor @NonNull @Initialized String streamName = "";
        @Nullable @UnknownKeyFor @Initialized BigQueryServices.StreamAppendClient client = null;
        @UnknownKeyFor @NonNull @Initialized long offset = -1L;
        @UnknownKeyFor @NonNull @Initialized long numRows = 0L;
        @UnknownKeyFor @NonNull @Initialized long tryIteration = 0L;
        @UnknownKeyFor @NonNull @Initialized ProtoRows protoRows;

        AppendRowsContext(@UnknownKeyFor @NonNull @Initialized ShardedKey<DestinationT> key, ProtoRows protoRows) {
            this.key = key;
            this.protoRows = protoRows;
        }

        @SideEffectFree
        public @UnknownKeyFor @NonNull @Initialized String toString() {
            return "Context: key=" + this.key + " streamName=" + this.streamName + " offset=" + this.offset + " numRows=" + this.numRows + " tryIteration: " + this.tryIteration;
        }
    }
}

