/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.test;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;

public class TestTableProvider
extends InMemoryMetaTableProvider {
    static final Map<Long, Map<String, TableWithRows>> GLOBAL_TABLES = new ConcurrentHashMap<Long, Map<String, TableWithRows>>();
    private static final AtomicLong INSTANCES = new AtomicLong(0L);
    private final long instanceId = INSTANCES.getAndIncrement();

    public TestTableProvider() {
        GLOBAL_TABLES.put(this.instanceId, new ConcurrentHashMap());
    }

    @Override
    public String getTableType() {
        return "test";
    }

    public Map<String, TableWithRows> tables() {
        return GLOBAL_TABLES.get(this.instanceId);
    }

    @Override
    public void createTable(Table table) {
        this.tables().put(table.getName(), new TableWithRows(this.instanceId, table));
    }

    @Override
    public void dropTable(String tableName) {
        this.tables().remove(tableName);
    }

    @Override
    public Map<String, Table> getTables() {
        return this.tables().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((TableWithRows)e.getValue()).table));
    }

    @Override
    public synchronized BeamSqlTable buildBeamSqlTable(Table table) {
        return new InMemoryTable(this.tables().get(table.getName()));
    }

    public void addRows(String tableName, Row ... rows) {
        Preconditions.checkArgument((boolean)this.tables().containsKey(tableName), (Object)("Table not found: " + tableName));
        this.tables().get(tableName).rows.addAll(Arrays.asList(rows));
    }

    public List<Row> tableRows(String tableName) {
        return this.tables().get(tableName).rows;
    }

    private static final class CollectorFn
    extends DoFn<Row, Row> {
        private TableWithRows tableWithRows;

        CollectorFn(TableWithRows tableWithRows) {
            this.tableWithRows = tableWithRows;
        }

        @DoFn.ProcessElement
        public void procesElement(DoFn.ProcessContext context) {
            long instanceId = this.tableWithRows.tableProviderInstanceId;
            String tableName = this.tableWithRows.table.getName();
            GLOBAL_TABLES.get(instanceId).get(tableName).rows.add((Row)context.element());
            context.output((Object)((Row)context.element()));
        }
    }

    private static class InMemoryTable
    implements BeamSqlTable {
        private TableWithRows tableWithRows;

        @Override
        public PCollection.IsBounded isBounded() {
            return PCollection.IsBounded.BOUNDED;
        }

        public InMemoryTable(TableWithRows tableWithRows) {
            this.tableWithRows = tableWithRows;
        }

        public Coder<Row> rowCoder() {
            return SchemaCoder.of((Schema)this.tableWithRows.table.getSchema(), (SerializableFunction)SerializableFunctions.identity(), (SerializableFunction)SerializableFunctions.identity());
        }

        @Override
        public BeamTableStatistics getRowCount(PipelineOptions options) {
            return BeamTableStatistics.createBoundedTableStatistics(Double.valueOf(this.tableWithRows.getRows().size()));
        }

        @Override
        public PCollection<Row> buildIOReader(PBegin begin) {
            TableWithRows tableWithRows = GLOBAL_TABLES.get(this.tableWithRows.tableProviderInstanceId).get(this.tableWithRows.table.getName());
            return (PCollection)begin.apply((PTransform)Create.of((Iterable)tableWithRows.rows).withCoder(this.rowCoder()));
        }

        @Override
        public POutput buildIOWriter(PCollection<Row> input) {
            input.apply((PTransform)ParDo.of((DoFn)new CollectorFn(this.tableWithRows)));
            return PDone.in((Pipeline)input.getPipeline());
        }

        @Override
        public Schema getSchema() {
            return this.tableWithRows.table.getSchema();
        }
    }

    public static class TableWithRows
    implements Serializable {
        private Table table;
        private List<Row> rows;
        private long tableProviderInstanceId;

        public TableWithRows(long tableProviderInstanceId, Table table) {
            this.tableProviderInstanceId = tableProviderInstanceId;
            this.table = table;
            this.rows = new CopyOnWriteArrayList<Row>();
        }

        public List<Row> getRows() {
            return this.rows;
        }
    }
}

