/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.schema.kafka;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.base.Preconditions;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;

public abstract class BeamKafkaTable
extends BaseBeamTable
implements Serializable {
    private String bootstrapServers;
    private List<String> topics;
    private Map<String, Object> configUpdates;

    protected BeamKafkaTable(BeamRecordSqlType beamSqlRowType) {
        super(beamSqlRowType);
    }

    public BeamKafkaTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers, List<String> topics) {
        super(beamSqlRowType);
        this.bootstrapServers = bootstrapServers;
        this.topics = topics;
    }

    public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) {
        this.configUpdates = configUpdates;
        return this;
    }

    @Override
    public BeamIOType getSourceType() {
        return BeamIOType.UNBOUNDED;
    }

    public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> getPTransformForInput();

    public abstract PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> getPTransformForOutput();

    @Override
    public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
        return (PCollection)((PCollection)PBegin.in((Pipeline)pipeline).apply("read", KafkaIO.read().withBootstrapServers(this.bootstrapServers).withTopics(this.topics).updateConsumerProperties(this.configUpdates).withKeyDeserializerAndCoder(ByteArrayDeserializer.class, (Coder)ByteArrayCoder.of()).withValueDeserializerAndCoder(ByteArrayDeserializer.class, (Coder)ByteArrayCoder.of()).withoutMetadata())).apply("in_format", this.getPTransformForInput());
    }

    @Override
    public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
        Preconditions.checkArgument(this.topics != null && this.topics.size() == 1, "Only one topic can be acceptable as output.");
        return new PTransform<PCollection<BeamRecord>, PDone>(){

            public PDone expand(PCollection<BeamRecord> input) {
                return (PDone)((PCollection)input.apply("out_reformat", BeamKafkaTable.this.getPTransformForOutput())).apply("persistent", (PTransform)KafkaIO.write().withBootstrapServers(BeamKafkaTable.this.bootstrapServers).withTopic((String)BeamKafkaTable.this.topics.get(0)).withKeySerializer(ByteArraySerializer.class).withValueSerializer(ByteArraySerializer.class));
            }
        };
    }
}

