/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.value.AutoValue;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.AutoValue_PubsubMessageToRow;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.JsonToRowUtils;
import org.apache.beam.sdk.util.RowJsonDeserializer;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;

@Internal
@Experimental
@AutoValue
public abstract class PubsubMessageToRow
extends DoFn<PubsubMessage, Row> {
    static final String TIMESTAMP_FIELD = "event_timestamp";
    static final String ATTRIBUTES_FIELD = "attributes";
    static final String PAYLOAD_FIELD = "payload";
    static final TupleTag<PubsubMessage> DLQ_TAG = new TupleTag<PubsubMessage>(){};
    static final TupleTag<Row> MAIN_TAG = new TupleTag<Row>(){};
    @Nullable
    private volatile transient ObjectMapper objectMapper;

    public abstract Schema messageSchema();

    public abstract boolean useDlq();

    private Schema payloadSchema() {
        return this.messageSchema().getField(PAYLOAD_FIELD).getType().getRowSchema();
    }

    public static Builder builder() {
        return new AutoValue_PubsubMessageToRow.Builder();
    }

    @DoFn.ProcessElement
    public void processElement(DoFn.ProcessContext context) {
        try {
            List<Object> values = this.getFieldValues(context);
            context.output((Object)Row.withSchema((Schema)this.messageSchema()).addValues(values).build());
        }
        catch (RowJsonDeserializer.UnsupportedRowJsonException jsonException) {
            if (this.useDlq()) {
                context.output(DLQ_TAG, (Object)((PubsubMessage)context.element()));
            }
            throw new RuntimeException("Error parsing message", jsonException);
        }
    }

    private List<Object> getFieldValues(DoFn.ProcessContext context) {
        return this.messageSchema().getFields().stream().map(field -> this.getValueForField((Schema.Field)field, context.timestamp(), (PubsubMessage)context.element())).collect(Collectors.toList());
    }

    private Object getValueForField(Schema.Field field, Instant timestamp, PubsubMessage pubsubMessage) {
        switch (field.getName()) {
            case "event_timestamp": {
                return timestamp;
            }
            case "attributes": {
                return pubsubMessage.getAttributeMap();
            }
            case "payload": {
                return this.parsePayloadJsonRow(pubsubMessage);
            }
        }
        throw new IllegalArgumentException("Unexpected field '" + field.getName() + "' in top level schema for Pubsub message. Top level schema should only contain 'timestamp', 'attributes', and 'payload' fields");
    }

    private Row parsePayloadJsonRow(PubsubMessage pubsubMessage) {
        String payloadJson = new String(pubsubMessage.getPayload(), StandardCharsets.UTF_8);
        if (this.objectMapper == null) {
            this.objectMapper = JsonToRowUtils.newObjectMapperWith((RowJsonDeserializer)RowJsonDeserializer.forSchema((Schema)this.payloadSchema()));
        }
        return JsonToRowUtils.jsonToRow((ObjectMapper)this.objectMapper, (String)payloadJson);
    }

    @AutoValue.Builder
    static abstract class Builder {
        Builder() {
        }

        public abstract Builder messageSchema(Schema var1);

        public abstract Builder useDlq(boolean var1);

        public abstract PubsubMessageToRow build();
    }
}

