/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableRow;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.ShardedKey;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class WriteBundlesToFiles<DestinationT>
extends DoFn<KV<DestinationT, TableRow>, Result<DestinationT>> {
    private static final Logger LOG = LoggerFactory.getLogger(WriteBundlesToFiles.class);
    private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
    private transient Map<DestinationT, TableRowWriter> writers;
    private transient Map<DestinationT, BoundedWindow> writerWindows;
    private final String stepUuid;
    private final TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag;
    private int maxNumWritersPerBundle;
    private long maxFileSize;

    WriteBundlesToFiles(String stepUuid, TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag, int maxNumWritersPerBundle, long maxFileSize) {
        this.stepUuid = stepUuid;
        this.unwrittedRecordsTag = unwrittedRecordsTag;
        this.maxNumWritersPerBundle = maxNumWritersPerBundle;
        this.maxFileSize = maxFileSize;
    }

    @DoFn.StartBundle
    public void startBundle() {
        this.writers = Maps.newHashMap();
        this.writerWindows = Maps.newHashMap();
    }

    TableRowWriter createAndInsertWriter(DestinationT destination, String tempFilePrefix, BoundedWindow window) throws Exception {
        TableRowWriter writer = new TableRowWriter(tempFilePrefix);
        this.writers.put(destination, writer);
        this.writerWindows.put(destination, window);
        return writer;
    }

    @DoFn.ProcessElement
    public void processElement(DoFn.ProcessContext c, BoundedWindow window) throws Exception {
        TableRowWriter writer;
        String tempFilePrefix = BigQueryHelpers.resolveTempLocation(c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", this.stepUuid);
        Object destination = ((KV)c.element()).getKey();
        if (this.writers.containsKey(destination)) {
            writer = this.writers.get(destination);
        } else if (this.writers.size() <= this.maxNumWritersPerBundle) {
            writer = this.createAndInsertWriter(destination, tempFilePrefix, window);
        } else {
            c.output(this.unwrittedRecordsTag, (Object)KV.of(ShardedKey.of(destination, ThreadLocalRandom.current().nextInt(10)), (Object)((KV)c.element()).getValue()));
            return;
        }
        if (writer.getByteSize() > this.maxFileSize) {
            writer.close();
            TableRowWriter.Result result = writer.getResult();
            c.output(new Result<Object>(result.resourceId.toString(), result.byteSize, destination));
            writer = this.createAndInsertWriter(destination, tempFilePrefix, window);
        }
        try {
            writer.write((TableRow)((KV)c.element()).getValue());
        }
        catch (Exception e) {
            try {
                writer.close();
            }
            catch (Exception closeException) {
                e.addSuppressed(closeException);
            }
            throw e;
        }
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn.FinishBundleContext c) throws Exception {
        ArrayList exceptionList = Lists.newArrayList();
        for (TableRowWriter tableRowWriter : this.writers.values()) {
            try {
                tableRowWriter.close();
            }
            catch (Exception e) {
                exceptionList.add(e);
            }
        }
        if (!exceptionList.isEmpty()) {
            IOException e = new IOException("Failed to close some writers");
            for (Exception thrown : exceptionList) {
                e.addSuppressed(thrown);
            }
            throw e;
        }
        for (Map.Entry entry : this.writers.entrySet()) {
            try {
                Object destination = entry.getKey();
                TableRowWriter writer = (TableRowWriter)entry.getValue();
                TableRowWriter.Result result = writer.getResult();
                c.output(new Result(result.resourceId.toString(), result.byteSize, destination), this.writerWindows.get(destination).maxTimestamp(), this.writerWindows.get(destination));
            }
            catch (Exception e) {
                exceptionList.add(e);
            }
        }
        this.writers.clear();
    }

    public static class ResultCoder<DestinationT>
    extends StructuredCoder<Result<DestinationT>> {
        private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
        private static final VarLongCoder longCoder = VarLongCoder.of();
        private final Coder<DestinationT> destinationCoder;

        public static <DestinationT> ResultCoder<DestinationT> of(Coder<DestinationT> destinationCoder) {
            return new ResultCoder<DestinationT>(destinationCoder);
        }

        ResultCoder(Coder<DestinationT> destinationCoder) {
            this.destinationCoder = destinationCoder;
        }

        public void encode(Result<DestinationT> value, OutputStream outStream) throws IOException {
            if (value == null) {
                throw new CoderException("cannot encode a null value");
            }
            stringCoder.encode(value.filename, outStream);
            longCoder.encode(value.fileByteSize, outStream);
            this.destinationCoder.encode(value.destination, outStream);
        }

        public Result<DestinationT> decode(InputStream inStream) throws IOException {
            String filename = stringCoder.decode(inStream);
            long fileByteSize = longCoder.decode(inStream);
            Object destination = this.destinationCoder.decode(inStream);
            return new Result<Object>(filename, fileByteSize, destination);
        }

        public List<? extends Coder<?>> getCoderArguments() {
            return Collections.singletonList(this.destinationCoder);
        }

        public void verifyDeterministic() {
        }
    }

    public static final class Result<DestinationT>
    implements Serializable {
        private static final long serialVersionUID = 1L;
        public final String filename;
        public final Long fileByteSize;
        public final DestinationT destination;

        public Result(String filename, Long fileByteSize, DestinationT destination) {
            this.filename = filename;
            this.fileByteSize = fileByteSize;
            this.destination = destination;
        }
    }
}

