/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.AutoValue_PubsubMessageToRow;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.RowJson;
import org.apache.beam.sdk.util.RowJsonUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Instant;

@Internal
@Experimental
@AutoValue
public abstract class PubsubMessageToRow
extends PTransform<PCollection<PubsubMessage>, PCollectionTuple>
implements Serializable {
    static final String TIMESTAMP_FIELD = "event_timestamp";
    static final String ATTRIBUTES_FIELD = "attributes";
    static final String PAYLOAD_FIELD = "payload";
    static final TupleTag<PubsubMessage> DLQ_TAG = new TupleTag<PubsubMessage>(){};
    static final TupleTag<Row> MAIN_TAG = new TupleTag<Row>(){};

    public abstract Schema messageSchema();

    public abstract boolean useDlq();

    public abstract boolean useFlatSchema();

    public static Builder builder() {
        return new AutoValue_PubsubMessageToRow.Builder();
    }

    public PCollectionTuple expand(PCollection<PubsubMessage> input) {
        PCollectionTuple rows = (PCollectionTuple)input.apply((PTransform)ParDo.of((DoFn)(this.useFlatSchema() ? new FlatSchemaPubsubMessageToRoW(this.messageSchema(), this.useDlq()) : new NestedSchemaPubsubMessageToRow(this.messageSchema(), this.useDlq()))).withOutputTags(MAIN_TAG, this.useDlq() ? TupleTagList.of(DLQ_TAG) : TupleTagList.empty()));
        return rows;
    }

    @AutoValue.Builder
    static abstract class Builder {
        Builder() {
        }

        public abstract Builder messageSchema(Schema var1);

        public abstract Builder useDlq(boolean var1);

        public abstract Builder useFlatSchema(boolean var1);

        public abstract PubsubMessageToRow build();
    }

    @Internal
    private static class NestedSchemaPubsubMessageToRow
    extends DoFn<PubsubMessage, Row> {
        private final Schema messageSchema;
        private final boolean useDlq;
        @Nullable
        private volatile transient ObjectMapper objectMapper;

        protected NestedSchemaPubsubMessageToRow(Schema messageSchema, boolean useDlq) {
            this.messageSchema = messageSchema;
            this.useDlq = useDlq;
        }

        private Object getValueForFieldNestedSchema(Schema.Field field, Instant timestamp, Map<String, String> attributeMap, Row payload) {
            switch (field.getName()) {
                case "event_timestamp": {
                    return timestamp;
                }
                case "attributes": {
                    return attributeMap;
                }
                case "payload": {
                    return payload;
                }
            }
            throw new IllegalArgumentException("Unexpected field '" + field.getName() + "' in top level schema for Pubsub message. Top level schema should only contain 'timestamp', 'attributes', and 'payload' fields");
        }

        private Row parsePayload(PubsubMessage pubsubMessage) {
            String payloadJson = new String(pubsubMessage.getPayload(), StandardCharsets.UTF_8);
            Schema payloadSchema = this.messageSchema.getField(PubsubMessageToRow.PAYLOAD_FIELD).getType().getRowSchema();
            if (this.objectMapper == null) {
                this.objectMapper = RowJsonUtils.newObjectMapperWith((RowJson.RowJsonDeserializer)RowJson.RowJsonDeserializer.forSchema((Schema)payloadSchema));
            }
            return RowJsonUtils.jsonToRow((ObjectMapper)this.objectMapper, (String)payloadJson);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            try {
                Row payload = this.parsePayload((PubsubMessage)context.element());
                List values = this.messageSchema.getFields().stream().map(field -> this.getValueForFieldNestedSchema((Schema.Field)field, context.timestamp(), ((PubsubMessage)context.element()).getAttributeMap(), payload)).collect(Collectors.toList());
                context.output((Object)Row.withSchema((Schema)this.messageSchema).addValues(values).build());
            }
            catch (RowJson.RowJsonDeserializer.UnsupportedRowJsonException jsonException) {
                if (this.useDlq) {
                    context.output(DLQ_TAG, (Object)((PubsubMessage)context.element()));
                }
                throw new RuntimeException("Error parsing message", jsonException);
            }
        }
    }

    @Internal
    private static class FlatSchemaPubsubMessageToRoW
    extends DoFn<PubsubMessage, Row> {
        private final Schema messageSchema;
        private final boolean useDlq;
        @Nullable
        private volatile transient ObjectMapper objectMapper;

        protected FlatSchemaPubsubMessageToRoW(Schema messageSchema, boolean useDlq) {
            this.messageSchema = messageSchema;
            this.useDlq = useDlq;
        }

        private Object getValueForFieldFlatSchema(Schema.Field field, Instant timestamp, Row payload) {
            String fieldName = field.getName();
            if (PubsubMessageToRow.TIMESTAMP_FIELD.equals(fieldName)) {
                return timestamp;
            }
            return payload.getValue(fieldName);
        }

        private Row parsePayload(PubsubMessage pubsubMessage) {
            String payloadJson = new String(pubsubMessage.getPayload(), StandardCharsets.UTF_8);
            Schema payloadSchema = new Schema(this.messageSchema.getFields().stream().filter(f -> !f.getName().equals(PubsubMessageToRow.TIMESTAMP_FIELD)).collect(Collectors.toList()));
            if (this.objectMapper == null) {
                this.objectMapper = RowJsonUtils.newObjectMapperWith((RowJson.RowJsonDeserializer)RowJson.RowJsonDeserializer.forSchema((Schema)payloadSchema));
            }
            return RowJsonUtils.jsonToRow((ObjectMapper)this.objectMapper, (String)payloadJson);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            try {
                Row payload = this.parsePayload((PubsubMessage)context.element());
                List values = this.messageSchema.getFields().stream().map(field -> this.getValueForFieldFlatSchema((Schema.Field)field, context.timestamp(), payload)).collect(Collectors.toList());
                context.output((Object)Row.withSchema((Schema)this.messageSchema).addValues(values).build());
            }
            catch (RowJson.RowJsonDeserializer.UnsupportedRowJsonException jsonException) {
                if (this.useDlq) {
                    context.output(DLQ_TAG, (Object)((PubsubMessage)context.element()));
                }
                throw new RuntimeException("Error parsing message", jsonException);
            }
        }
    }
}

