/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.csv.CSVFormat;

public class BeamKafkaCSVTable
extends BeamKafkaTable {
    private CSVFormat csvFormat;

    public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers, List<String> topics) {
        this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
    }

    public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers, List<String> topics, CSVFormat format) {
        super(beamSqlRowType, bootstrapServers, topics);
        this.csvFormat = format;
    }

    @Override
    public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> getPTransformForInput() {
        return new CsvRecorderDecoder(this.beamRecordSqlType, this.csvFormat);
    }

    @Override
    public PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> getPTransformForOutput() {
        return new CsvRecorderEncoder(this.beamRecordSqlType, this.csvFormat);
    }

    public static class CsvRecorderEncoder
    extends PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> {
        private BeamRecordSqlType rowType;
        private CSVFormat format;

        public CsvRecorderEncoder(BeamRecordSqlType rowType, CSVFormat format) {
            this.rowType = rowType;
            this.format = format;
        }

        public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamRecord> input) {
            return (PCollection)input.apply("encodeRecord", (PTransform)ParDo.of((DoFn)new DoFn<BeamRecord, KV<byte[], byte[]>>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c) {
                    BeamRecord in = (BeamRecord)c.element();
                    c.output((Object)KV.of((Object)new byte[0], (Object)BeamTableUtils.beamRecord2CsvLine(in, format).getBytes()));
                }
            }));
        }
    }

    public static class CsvRecorderDecoder
    extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> {
        private BeamRecordSqlType rowType;
        private CSVFormat format;

        public CsvRecorderDecoder(BeamRecordSqlType rowType, CSVFormat format) {
            this.rowType = rowType;
            this.format = format;
        }

        public PCollection<BeamRecord> expand(PCollection<KV<byte[], byte[]>> input) {
            return (PCollection)input.apply("decodeRecord", (PTransform)ParDo.of((DoFn)new DoFn<KV<byte[], byte[]>, BeamRecord>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c) {
                    String rowInString = new String((byte[])((KV)c.element()).getValue());
                    c.output((Object)BeamTableUtils.csvLine2BeamRecord(format, rowInString, rowType));
                }
            }));
        }
    }
}

