/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;

import com.alibaba.fastjson.JSONObject;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubIOJsonTable;
import org.apache.beam.sdk.schemas.Schema;

@Internal
@Experimental
public class PubsubJsonTableProvider
extends InMemoryMetaTableProvider {
    @Override
    public String getTableType() {
        return "pubsub";
    }

    @Override
    public BeamSqlTable buildBeamSqlTable(Table tableDefintion) {
        this.validatePubsubMessageSchema(tableDefintion);
        JSONObject tableProperties = tableDefintion.getProperties();
        String timestampAttributeKey = tableProperties.getString("timestampAttributeKey");
        String deadLetterQueue = tableProperties.getString("deadLetterQueue");
        this.validateDlq(deadLetterQueue);
        return PubsubIOJsonTable.builder().setSchema(tableDefintion.getSchema()).setTimestampAttribute(timestampAttributeKey).setDeadLetterQueue(deadLetterQueue).setTopic(tableDefintion.getLocation()).build();
    }

    private void validatePubsubMessageSchema(Table tableDefinition) {
        Schema schema = tableDefinition.getSchema();
        if (!(schema.getFieldCount() == 3 && this.fieldPresent(schema, "event_timestamp", CalciteUtils.TIMESTAMP) && this.fieldPresent(schema, "attributes", Schema.FieldType.map((Schema.FieldType)CalciteUtils.VARCHAR, (Schema.FieldType)CalciteUtils.VARCHAR)) && schema.hasField("payload") && Schema.TypeName.ROW.equals((Object)schema.getField("payload").getType().getTypeName()))) {
            throw new IllegalArgumentException("Unsupported schema specified for Pubsub source in CREATE TABLE. CREATE TABLE for Pubsub topic should define exactly the following fields: 'event_timestamp' field of type 'TIMESTAMP', 'attributes' field of type MAP<VARCHAR, VARCHAR>, and 'payload' field of type 'ROW<...>' which matches the payload JSON format.");
        }
    }

    private boolean fieldPresent(Schema schema, String field, Schema.FieldType expectedType) {
        return schema.hasField(field) && expectedType.equals((Object)schema.getField(field).getType());
    }

    private void validateDlq(String deadLetterQueue) {
        if (deadLetterQueue != null && deadLetterQueue.isEmpty()) {
            throw new IllegalArgumentException("Dead letter queue topic name is not specified");
        }
    }
}

