/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.ShardedKey;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowInfo;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Instant;

@SystemDoFnInternal
@VisibleForTesting
class StreamingWriteFn
extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
    private final BigQueryServices bqServices;
    private final InsertRetryPolicy retryPolicy;
    private final TupleTag<TableRow> failedOutputTag;
    private transient Map<String, List<ValueInSingleWindow<TableRow>>> tableRows;
    private transient Map<String, List<String>> uniqueIdsForTableRows;
    private Counter byteCounter = SinkMetrics.bytesWritten();

    StreamingWriteFn(BigQueryServices bqServices, InsertRetryPolicy retryPolicy, TupleTag<TableRow> failedOutputTag) {
        this.bqServices = bqServices;
        this.retryPolicy = retryPolicy;
        this.failedOutputTag = failedOutputTag;
    }

    @DoFn.StartBundle
    public void startBundle() {
        this.tableRows = new HashMap<String, List<ValueInSingleWindow<TableRow>>>();
        this.uniqueIdsForTableRows = new HashMap<String, List<String>>();
    }

    @DoFn.ProcessElement
    public void processElement(DoFn.ProcessContext context, BoundedWindow window) {
        String tableSpec = (String)((ShardedKey)((KV)context.element()).getKey()).getKey();
        List<ValueInSingleWindow<TableRow>> rows = BigQueryHelpers.getOrCreateMapListValue(this.tableRows, tableSpec);
        List<String> uniqueIds = BigQueryHelpers.getOrCreateMapListValue(this.uniqueIdsForTableRows, tableSpec);
        rows.add((ValueInSingleWindow<TableRow>)ValueInSingleWindow.of((Object)((TableRowInfo)((KV)context.element()).getValue()).tableRow, (Instant)context.timestamp(), (BoundedWindow)window, (PaneInfo)context.pane()));
        uniqueIds.add(((TableRowInfo)((KV)context.element()).getValue()).uniqueId);
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn.FinishBundleContext context) throws Exception {
        ArrayList failedInserts = Lists.newArrayList();
        BigQueryOptions options = (BigQueryOptions)context.getPipelineOptions().as(BigQueryOptions.class);
        for (Map.Entry<String, List<ValueInSingleWindow<TableRow>>> entry : this.tableRows.entrySet()) {
            TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey());
            this.flushRows(tableReference, entry.getValue(), this.uniqueIdsForTableRows.get(entry.getKey()), options, failedInserts);
        }
        this.tableRows.clear();
        this.uniqueIdsForTableRows.clear();
        for (ValueInSingleWindow row : failedInserts) {
            context.output(this.failedOutputTag, row.getValue(), row.getTimestamp(), row.getWindow());
        }
    }

    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
    }

    private void flushRows(TableReference tableReference, List<ValueInSingleWindow<TableRow>> tableRows, List<String> uniqueIds, BigQueryOptions options, List<ValueInSingleWindow<TableRow>> failedInserts) throws InterruptedException {
        if (!tableRows.isEmpty()) {
            try {
                long totalBytes = this.bqServices.getDatasetService(options).insertAll(tableReference, tableRows, uniqueIds, this.retryPolicy, failedInserts);
                this.byteCounter.inc(totalBytes);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

