/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;

import com.alibaba.fastjson.JSONObject;
import com.google.datastore.v1.Entity;
import com.google.datastore.v1.Key;
import com.google.datastore.v1.Query;
import com.google.datastore.v1.Value;
import com.google.datastore.v1.client.DatastoreHelper;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Timestamp;
import java.io.Serializable;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.InvalidTableException;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@Experimental
class DataStoreV1Table
extends SchemaBaseBeamTable
implements Serializable {
    public static final String KEY_FIELD_PROPERTY = "keyField";
    @VisibleForTesting
    static final String DEFAULT_KEY_FIELD = "__key__";
    private static final Logger LOG = LoggerFactory.getLogger(DataStoreV1Table.class);
    private static final Pattern locationPattern = Pattern.compile("(?<projectId>.+)/(?<kind>.+)");
    @VisibleForTesting
    final String keyField;
    @VisibleForTesting
    final String projectId;
    @VisibleForTesting
    final String kind;

    DataStoreV1Table(Table table) {
        super(table.getSchema());
        JSONObject properties = table.getProperties();
        if (properties.containsKey((Object)KEY_FIELD_PROPERTY)) {
            String field = properties.getString(KEY_FIELD_PROPERTY);
            if (field == null || field.isEmpty()) {
                throw new InvalidTableException(String.format("'%s' property cannot be null.", KEY_FIELD_PROPERTY));
            }
            this.keyField = field;
        } else {
            this.keyField = DEFAULT_KEY_FIELD;
        }
        String location = table.getLocation();
        if (location == null) {
            throw new InvalidTableException("DataStoreV1 location must be set: " + table);
        }
        Matcher matcher = locationPattern.matcher(location);
        if (!matcher.matches()) {
            throw new InvalidTableException("DataStoreV1 location must be in the following format: 'projectId/kind' but was:" + location);
        }
        this.projectId = matcher.group("projectId");
        this.kind = matcher.group("kind");
    }

    @Override
    public PCollection<Row> buildIOReader(PBegin begin) {
        Query.Builder q = Query.newBuilder();
        q.addKindBuilder().setName(this.kind);
        Query query = q.build();
        DatastoreV1.Read readInstance = DatastoreIO.v1().read().withProjectId(this.projectId).withQuery(query);
        return (PCollection)((PCollection)begin.apply("Read Datastore Entities", (PTransform)readInstance)).apply("Convert Datastore Entities to Rows", (PTransform)EntityToRow.create(this.getSchema(), this.keyField));
    }

    @Override
    public POutput buildIOWriter(PCollection<Row> input) {
        return ((PCollection)input.apply("Convert Rows to Datastore Entities", (PTransform)RowToEntity.create(this.keyField, this.kind))).apply("Write Datastore Entities", (PTransform)DatastoreIO.v1().write().withProjectId(this.projectId));
    }

    @Override
    public PCollection.IsBounded isBounded() {
        return PCollection.IsBounded.BOUNDED;
    }

    @Override
    public BeamTableStatistics getTableStatistics(PipelineOptions options) {
        long count = DatastoreIO.v1().read().withProjectId(this.projectId).getNumEntities(options, this.kind, null);
        if (count < 0L) {
            return BeamTableStatistics.BOUNDED_UNKNOWN;
        }
        return BeamTableStatistics.createBoundedTableStatistics(Double.valueOf(count));
    }

    public static class RowToEntity
    extends PTransform<PCollection<Row>, PCollection<Entity>> {
        private final Supplier<String> keySupplier;
        private final String kind;
        private final String keyField;

        private RowToEntity(Supplier<String> keySupplier, String kind, String keyField) {
            this.keySupplier = keySupplier;
            this.kind = kind;
            this.keyField = keyField;
        }

        public PCollection<Entity> expand(PCollection<Row> input) {
            boolean isFieldPresent = input.getSchema().getFieldNames().contains(this.keyField);
            if (isFieldPresent) {
                if (!input.getSchema().getField(this.keyField).getType().getTypeName().equals((Object)Schema.TypeName.BYTES)) {
                    throw new IllegalStateException("Field `" + this.keyField + "` should of type `VARBINARY`. Please change the type or specify a field to write the KEY value from via TableProperties.");
                }
                LOG.info("Field to use as Entity KEY is set to: `" + this.keyField + "`.");
            }
            return (PCollection)input.apply((PTransform)ParDo.of((DoFn)new RowToEntityConverter(isFieldPresent)));
        }

        public static RowToEntity create(String keyField, String kind) {
            return new RowToEntity((Supplier<String> & Serializable)() -> UUID.randomUUID().toString(), kind, keyField);
        }

        @VisibleForTesting
        static RowToEntity createTest(String keyString, String keyField, String kind) {
            return new RowToEntity((Supplier<String> & Serializable)() -> keyString, kind, keyField);
        }

        @VisibleForTesting
        class RowToEntityConverter
        extends DoFn<Row, Entity> {
            private final boolean useNonRandomKey;

            RowToEntityConverter(boolean useNonRandomKey) {
                this.useNonRandomKey = useNonRandomKey;
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context) {
                Row row = (Row)context.element();
                Schema schemaWithoutKeyField = Schema.builder().addFields(row.getSchema().getFields().stream().filter(field -> !field.getName().equals(RowToEntity.this.keyField)).collect(Collectors.toList())).build();
                Entity.Builder entityBuilder = this.constructEntityFromRow(schemaWithoutKeyField, row);
                entityBuilder.setKey(this.constructKeyFromRow(row));
                context.output((Object)entityBuilder.build());
            }

            private Entity.Builder constructEntityFromRow(Schema schema, Row row) {
                Entity.Builder entityBuilder = Entity.newBuilder();
                for (Schema.Field field : schema.getFields()) {
                    Value val = this.mapObjectToValue(row.getValue(field.getName()));
                    entityBuilder.putProperties(field.getName(), val);
                }
                return entityBuilder;
            }

            private Key constructKeyFromRow(Row row) {
                if (!this.useNonRandomKey) {
                    return DatastoreHelper.makeKey((Object[])new Object[]{RowToEntity.this.kind, RowToEntity.this.keySupplier.get()}).build();
                }
                byte[] keyBytes = row.getBytes(RowToEntity.this.keyField);
                try {
                    return Key.parseFrom((byte[])keyBytes);
                }
                catch (InvalidProtocolBufferException e) {
                    throw new IllegalStateException("Failed to parse DataStore key from bytes.");
                }
            }

            private Value mapObjectToValue(Object value) {
                if (value == null) {
                    return Value.newBuilder().build();
                }
                if (Boolean.class.equals(value.getClass())) {
                    return DatastoreHelper.makeValue((boolean)((Boolean)value)).build();
                }
                if (Byte.class.equals(value.getClass())) {
                    return DatastoreHelper.makeValue((long)((Byte)value).byteValue()).build();
                }
                if (Long.class.equals(value.getClass())) {
                    return DatastoreHelper.makeValue((long)((Long)value)).build();
                }
                if (Short.class.equals(value.getClass())) {
                    return DatastoreHelper.makeValue((long)((Short)value).shortValue()).build();
                }
                if (Integer.class.equals(value.getClass())) {
                    return DatastoreHelper.makeValue((long)((Integer)value).intValue()).build();
                }
                if (Double.class.equals(value.getClass())) {
                    return DatastoreHelper.makeValue((double)((Double)value)).build();
                }
                if (Float.class.equals(value.getClass())) {
                    return DatastoreHelper.makeValue((double)((Float)value).floatValue()).build();
                }
                if (String.class.equals(value.getClass())) {
                    return DatastoreHelper.makeValue((String)((String)value)).build();
                }
                if (Instant.class.equals(value.getClass())) {
                    return DatastoreHelper.makeValue((Date)((Instant)value).toDate()).build();
                }
                if (byte[].class.equals(value.getClass())) {
                    return DatastoreHelper.makeValue((ByteString)ByteString.copyFrom((byte[])((byte[])value))).build();
                }
                if (value instanceof Row) {
                    Row row = (Row)value;
                    return DatastoreHelper.makeValue((Entity.Builder)this.constructEntityFromRow(row.getSchema(), row)).build();
                }
                if (value instanceof Collection) {
                    Collection collection = (Collection)value;
                    List arrayValues = collection.stream().map(this::mapObjectToValue).collect(Collectors.toList());
                    return DatastoreHelper.makeValue(arrayValues).build();
                }
                throw new IllegalStateException("No conversion exists from type: " + value.getClass() + " to DataStove Value.");
            }
        }
    }

    public static class EntityToRow
    extends PTransform<PCollection<Entity>, PCollection<Row>> {
        private final Schema schema;
        private final String keyField;

        private EntityToRow(Schema schema, String keyField) {
            this.schema = schema;
            this.keyField = keyField;
            if (schema.getFieldNames().contains(keyField)) {
                if (!schema.getField(keyField).getType().getTypeName().equals((Object)Schema.TypeName.BYTES)) {
                    throw new IllegalStateException("Field `" + keyField + "` should of type `VARBINARY`. Please change the type or specify a field to store the KEY value.");
                }
                LOG.info("Entity KEY will be stored under `" + keyField + "` field.");
            }
        }

        public static EntityToRow create(Schema schema, String keyField) {
            return new EntityToRow(schema, keyField);
        }

        public PCollection<Row> expand(PCollection<Entity> input) {
            return ((PCollection)input.apply((PTransform)ParDo.of((DoFn)new EntityToRowConverter()))).setRowSchema(this.schema);
        }

        @VisibleForTesting
        class EntityToRowConverter
        extends DoFn<Entity, Row> {
            EntityToRowConverter() {
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context) {
                Entity entity = (Entity)context.element();
                ImmutableMap.Builder mapBuilder = ImmutableMap.builder();
                mapBuilder.put((Object)EntityToRow.this.keyField, (Object)DatastoreHelper.makeValue((Key)entity.getKey()).build());
                mapBuilder.putAll(entity.getPropertiesMap());
                context.output((Object)this.extractRowFromProperties(EntityToRow.this.schema, (Map<String, Value>)mapBuilder.build()));
            }

            private Object convertValueToObject(Schema.FieldType currentFieldType, Value val) {
                Value.ValueTypeCase typeCase = val.getValueTypeCase();
                switch (typeCase) {
                    case NULL_VALUE: 
                    case VALUETYPE_NOT_SET: {
                        return null;
                    }
                    case BOOLEAN_VALUE: {
                        return val.getBooleanValue();
                    }
                    case INTEGER_VALUE: {
                        return val.getIntegerValue();
                    }
                    case DOUBLE_VALUE: {
                        return val.getDoubleValue();
                    }
                    case TIMESTAMP_VALUE: {
                        Timestamp time = val.getTimestampValue();
                        long millis = time.getSeconds() * 1000L + (long)(time.getNanos() / 1000);
                        return Instant.ofEpochMilli((long)millis).toDateTime();
                    }
                    case STRING_VALUE: {
                        return val.getStringValue();
                    }
                    case KEY_VALUE: {
                        return val.getKeyValue().toByteArray();
                    }
                    case BLOB_VALUE: {
                        return val.getBlobValue().toByteArray();
                    }
                    case ENTITY_VALUE: {
                        Schema rowSchema = currentFieldType.getRowSchema();
                        assert (rowSchema != null);
                        Entity entity = val.getEntityValue();
                        return this.extractRowFromProperties(rowSchema, entity.getPropertiesMap());
                    }
                    case ARRAY_VALUE: {
                        Schema.FieldType elementType = currentFieldType.getCollectionElementType();
                        List valueList = val.getArrayValue().getValuesList();
                        return valueList.stream().map(v -> this.convertValueToObject(elementType, (Value)v)).collect(Collectors.toList());
                    }
                }
                throw new IllegalStateException("No conversion exists from type: " + val.getValueTypeCase().name() + " to Beam type.");
            }

            private Row extractRowFromProperties(Schema schema, Map<String, Value> values) {
                Row.Builder builder = Row.withSchema((Schema)schema);
                for (Schema.Field field : schema.getFields()) {
                    Value val = values.get(field.getName());
                    builder.addValue(this.convertValueToObject(field.getType(), val));
                }
                return builder.build();
            }
        }
    }
}

