/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import java.io.IOException;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.ErrorContainer;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowInfo;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.Histogram;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SystemDoFnInternal
@VisibleForTesting
class StreamingWriteFn<@UnknownKeyFor ErrorT, @UnknownKeyFor ElementT>
extends DoFn<KV<ShardedKey<String>, TableRowInfo<ElementT>>, Void> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(StreamingWriteFn.class);
    private final @UnknownKeyFor @NonNull @Initialized BigQueryServices bqServices;
    private final @UnknownKeyFor @NonNull @Initialized InsertRetryPolicy retryPolicy;
    private final @UnknownKeyFor @NonNull @Initialized TupleTag<ErrorT> failedOutputTag;
    private final @UnknownKeyFor @NonNull @Initialized ErrorContainer<ErrorT> errorContainer;
    private final @UnknownKeyFor @NonNull @Initialized boolean skipInvalidRows;
    private final @UnknownKeyFor @NonNull @Initialized boolean ignoreUnknownValues;
    private final @UnknownKeyFor @NonNull @Initialized boolean ignoreInsertIds;
    private final @UnknownKeyFor @NonNull @Initialized SerializableFunction<ElementT, @UnknownKeyFor @NonNull @Initialized TableRow> toTableRow;
    private final @UnknownKeyFor @NonNull @Initialized SerializableFunction<ElementT, @UnknownKeyFor @NonNull @Initialized TableRow> toFailsafeTableRow;
    private transient @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized FailsafeValueInSingleWindow<@UnknownKeyFor @NonNull @Initialized TableRow, @UnknownKeyFor @NonNull @Initialized TableRow>>> tableRows;
    private transient @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>> uniqueIdsForTableRows;
    private transient @UnknownKeyFor @NonNull @Initialized long lastReportedSystemClockMillis;
    private transient @UnknownKeyFor @NonNull @Initialized Histogram histogram;
    private @UnknownKeyFor @NonNull @Initialized Counter byteCounter = SinkMetrics.bytesWritten();

    StreamingWriteFn(@UnknownKeyFor @NonNull @Initialized BigQueryServices bqServices, @UnknownKeyFor @NonNull @Initialized InsertRetryPolicy retryPolicy, @UnknownKeyFor @NonNull @Initialized TupleTag<ErrorT> failedOutputTag, @UnknownKeyFor @NonNull @Initialized ErrorContainer<ErrorT> errorContainer, @UnknownKeyFor @NonNull @Initialized boolean skipInvalidRows, @UnknownKeyFor @NonNull @Initialized boolean ignoreUnknownValues, @UnknownKeyFor @NonNull @Initialized boolean ignoreInsertIds, @UnknownKeyFor @NonNull @Initialized SerializableFunction<ElementT, @UnknownKeyFor @NonNull @Initialized TableRow> toTableRow, @UnknownKeyFor @NonNull @Initialized SerializableFunction<ElementT, @UnknownKeyFor @NonNull @Initialized TableRow> toFailsafeTableRow) {
        this.bqServices = bqServices;
        this.retryPolicy = retryPolicy;
        this.failedOutputTag = failedOutputTag;
        this.errorContainer = errorContainer;
        this.skipInvalidRows = skipInvalidRows;
        this.ignoreUnknownValues = ignoreUnknownValues;
        this.ignoreInsertIds = ignoreInsertIds;
        this.toTableRow = toTableRow;
        this.toFailsafeTableRow = toFailsafeTableRow;
    }

    @DoFn.Setup
    public void setup() {
        this.histogram = Histogram.linear((double)0.0, (double)20.0, (int)3000);
        this.lastReportedSystemClockMillis = System.currentTimeMillis();
    }

    @DoFn.Teardown
    public void teardown() {
        if (this.histogram.getTotalCount() > 0L) {
            this.logPercentiles();
            this.histogram.clear();
        }
    }

    @DoFn.StartBundle
    public void startBundle() {
        this.tableRows = new HashMap<String, List<FailsafeValueInSingleWindow<TableRow, TableRow>>>();
        this.uniqueIdsForTableRows = new HashMap<String, List<String>>();
    }

    @DoFn.ProcessElement
    public void processElement(@DoFn.Element @UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized ShardedKey<@UnknownKeyFor @NonNull @Initialized String>, @UnknownKeyFor @NonNull @Initialized TableRowInfo<ElementT>> element, @DoFn.Timestamp @UnknownKeyFor @NonNull @Initialized Instant timestamp, @UnknownKeyFor @NonNull @Initialized BoundedWindow window, @UnknownKeyFor @NonNull @Initialized PaneInfo pane) {
        String tableSpec = (String)((ShardedKey)element.getKey()).getKey();
        List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows = BigQueryHelpers.getOrCreateMapListValue(this.tableRows, tableSpec);
        List<String> uniqueIds = BigQueryHelpers.getOrCreateMapListValue(this.uniqueIdsForTableRows, tableSpec);
        TableRow tableRow = (TableRow)this.toTableRow.apply(((TableRowInfo)element.getValue()).tableRow);
        TableRow failsafeTableRow = (TableRow)this.toFailsafeTableRow.apply(((TableRowInfo)element.getValue()).tableRow);
        rows.add((FailsafeValueInSingleWindow<TableRow, TableRow>)FailsafeValueInSingleWindow.of((Object)tableRow, (Instant)timestamp, (BoundedWindow)window, (PaneInfo)pane, (Object)failsafeTableRow));
        uniqueIds.add(((TableRowInfo)element.getValue()).uniqueId);
    }

    @DoFn.FinishBundle
    public void finishBundle(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized FinishBundleContext context) throws @UnknownKeyFor @NonNull @Initialized Exception {
        ArrayList failedInserts = Lists.newArrayList();
        BigQueryOptions options = (BigQueryOptions)context.getPipelineOptions().as(BigQueryOptions.class);
        for (Map.Entry<String, List<FailsafeValueInSingleWindow<TableRow, TableRow>>> entry : this.tableRows.entrySet()) {
            TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey());
            this.flushRows(tableReference, entry.getValue(), this.uniqueIdsForTableRows.get(entry.getKey()), options, failedInserts);
        }
        this.tableRows.clear();
        this.uniqueIdsForTableRows.clear();
        for (ValueInSingleWindow row : failedInserts) {
            context.output(this.failedOutputTag, row.getValue(), row.getTimestamp(), row.getWindow());
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (this.histogram.getTotalCount() > 0L && currentTimeMillis - this.lastReportedSystemClockMillis > (long)options.getLatencyLoggingFrequency().intValue() * 1000L) {
            this.logPercentiles();
            this.histogram.clear();
            this.lastReportedSystemClockMillis = currentTimeMillis;
        }
    }

    private void logPercentiles() {
        LOG.info("Total number of streaming insert requests: {}, P99: {}ms, P90: {}ms, P50: {}ms", new Object[]{this.histogram.getTotalCount(), DoubleMath.roundToInt((double)this.histogram.p99(), (RoundingMode)RoundingMode.HALF_UP), DoubleMath.roundToInt((double)this.histogram.p90(), (RoundingMode)RoundingMode.HALF_UP), DoubleMath.roundToInt((double)this.histogram.p50(), (RoundingMode)RoundingMode.HALF_UP)});
    }

    private void flushRows(@UnknownKeyFor @NonNull @Initialized TableReference tableReference, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized FailsafeValueInSingleWindow<@UnknownKeyFor @NonNull @Initialized TableRow, @UnknownKeyFor @NonNull @Initialized TableRow>> tableRows, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> uniqueIds, @UnknownKeyFor @NonNull @Initialized BigQueryOptions options, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized ValueInSingleWindow<ErrorT>> failedInserts) throws @UnknownKeyFor @NonNull @Initialized InterruptedException {
        if (!tableRows.isEmpty()) {
            try {
                long totalBytes = this.bqServices.getDatasetService(options, this.histogram).insertAll(tableReference, tableRows, uniqueIds, this.retryPolicy, failedInserts, this.errorContainer, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds);
                this.byteCounter.inc(totalBytes);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

