/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.transforms.partitions;

import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.data.Envelope;
import io.debezium.transforms.SmtManager;
import io.debezium.transforms.partitions.ComputePartitionConfigDefinition;
import io.debezium.transforms.partitions.ComputePartitionException;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ComputePartition<R extends ConnectRecord<R>>
implements Transformation<R> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ComputePartition.class);
    public static final String SCHEMA_FIELD_NAME = "schema";
    public static final String SQLSERVER_CONNECTOR = "sqlserver";
    public static final String KEYSPACE_FIELD_NAME = "keyspace";
    public static final String TABLE_FIELD_NAME = "table";
    public static final String COLLECTION_FIELD_NAME = "collection";
    public static final String CONNECTOR_FIELD_NAME = "connector";
    public static final String MONGODB_CONNECTOR = "mongodb";
    public static final String DB_FIELD_NAME = "db";
    public static final String MYSQL_CONNECTOR = "mysql";
    public static final String POSTGRES_CONNECTOR = "postgres";
    public static final String ORACLE_CONNECTOR = "oracle";
    public static final String DB2_CONNECTOR = "db2";
    public static final String CASSANDRA_CONNECTOR = "cassandra";
    public static final String VITESS_CONNECTOR = "vitess";
    private Set<String> tableNames;
    private SmtManager<R> smtManager;
    private Map<String, Integer> numberOfPartitionsByTable;
    private Map<String, String> fieldNameByTable;

    public ConfigDef config() {
        ConfigDef config = new ConfigDef();
        return Field.group(config, "partitions", ComputePartitionConfigDefinition.PARTITION_TABLE_FIELD_NAME_MAPPINGS_FIELD, ComputePartitionConfigDefinition.FIELD_TABLE_PARTITION_NUM_MAPPINGS_FIELD);
    }

    public void configure(Map<String, ?> props) {
        Configuration config = Configuration.from(props);
        this.smtManager = new SmtManager(config);
        this.smtManager.validate(config, Field.setOf(ComputePartitionConfigDefinition.PARTITION_TABLE_FIELD_NAME_MAPPINGS_FIELD, ComputePartitionConfigDefinition.FIELD_TABLE_PARTITION_NUM_MAPPINGS_FIELD));
        this.fieldNameByTable = ComputePartitionConfigDefinition.parseMappings(config.getStrings(ComputePartitionConfigDefinition.PARTITION_TABLE_FIELD_NAME_MAPPINGS_FIELD, ","));
        this.numberOfPartitionsByTable = ComputePartitionConfigDefinition.parseParititionMappings(config.getStrings(ComputePartitionConfigDefinition.FIELD_TABLE_PARTITION_NUM_MAPPINGS_FIELD, ","));
        this.checkConfigurationConsistency();
        this.tableNames = this.fieldNameByTable.keySet();
    }

    private void checkConfigurationConsistency() {
        if (this.numberOfPartitionsByTable.size() != this.fieldNameByTable.size()) {
            throw new ComputePartitionException(String.format("Unable to validate config. %s and %s has different number of table defined", "partition.data-collections.partition.num.mappings", "partition.data-collections.field.mappings"));
        }
        HashSet<String> intersection = new HashSet<String>(this.numberOfPartitionsByTable.keySet());
        intersection.retainAll(this.fieldNameByTable.keySet());
        if (intersection.size() != this.numberOfPartitionsByTable.size()) {
            throw new ComputePartitionException(String.format("Unable to validate config. %s and %s has different tables defined", "partition.data-collections.partition.num.mappings", "partition.data-collections.field.mappings"));
        }
        if (this.numberOfPartitionsByTable.containsValue(0)) {
            throw new ConnectException(String.format("Unable to validate config. %s: partition number cannot be 0", "partition.data-collections.partition.num.mappings"));
        }
    }

    public R apply(R r) {
        LOGGER.trace("Starting ComputePartition SMT with conf: {} {} {}", new Object[]{this.tableNames, this.fieldNameByTable, this.numberOfPartitionsByTable});
        if (r.value() == null || !this.smtManager.isValidEnvelope(r)) {
            LOGGER.trace("Skipping tombstone or message without envelope");
            return r;
        }
        Struct envelope = (Struct)r.value();
        try {
            String table = this.getTableName(envelope);
            if (this.skipRecord(table)) {
                return r;
            }
            Optional<Struct> payload = this.extractPayload(envelope);
            if (payload.isEmpty()) {
                return r;
            }
            Object fieldValue = payload.get().get(this.fieldNameByTable.get(table));
            int partition = this.computePartition(fieldValue, table);
            LOGGER.trace("Message {} will be sent to partition {}", (Object)envelope, (Object)partition);
            return (R)r.newRecord(r.topic(), Integer.valueOf(partition), r.keySchema(), r.key(), r.valueSchema(), (Object)envelope, r.timestamp(), (Iterable)r.headers());
        }
        catch (Exception e) {
            LOGGER.error("Error occurred while processing message {}. Skipping SMT", (Object)envelope);
            throw new ConnectException(String.format("Unprocessable message %s", envelope), (Throwable)e);
        }
    }

    private boolean skipRecord(String table) {
        if (!this.tableNames.contains(table)) {
            LOGGER.trace("Table {} is not configured. Skipping SMT", (Object)table);
            return true;
        }
        return false;
    }

    private String getTableName(Struct envelope) {
        String tablePrefix;
        String dataCollection;
        String connector;
        Struct struct = (Struct)envelope.get("source");
        switch (connector = struct.getString(CONNECTOR_FIELD_NAME)) {
            case "mongodb": {
                dataCollection = struct.getString(COLLECTION_FIELD_NAME);
                tablePrefix = struct.getString(DB_FIELD_NAME);
                break;
            }
            case "mysql": {
                dataCollection = struct.getString(TABLE_FIELD_NAME);
                tablePrefix = struct.getString(DB_FIELD_NAME);
                break;
            }
            case "postgres": 
            case "oracle": 
            case "sqlserver": 
            case "db2": {
                dataCollection = struct.getString(TABLE_FIELD_NAME);
                tablePrefix = struct.getString(SCHEMA_FIELD_NAME);
                break;
            }
            case "cassandra": 
            case "vitess": {
                dataCollection = struct.getString(TABLE_FIELD_NAME);
                tablePrefix = struct.getString(KEYSPACE_FIELD_NAME);
                break;
            }
            default: {
                throw new IllegalArgumentException("Unmanaged connector: " + connector);
            }
        }
        return String.format("%s.%s", tablePrefix, dataCollection);
    }

    private int computePartition(Object fieldValue, String table) {
        return (fieldValue.hashCode() & Integer.MAX_VALUE) % this.numberOfPartitionsByTable.get(table);
    }

    private Optional<Struct> extractPayload(Struct envelope) {
        Envelope.Operation operation = Envelope.Operation.forCode(envelope.getString("op"));
        if (operation == null) {
            throw new IllegalArgumentException("Unknown event operation: " + envelope.getString("op"));
        }
        switch (operation) {
            case CREATE: 
            case READ: 
            case UPDATE: {
                return Optional.of((Struct)envelope.get("after"));
            }
            case DELETE: {
                return Optional.of((Struct)envelope.get("before"));
            }
            case TRUNCATE: 
            case MESSAGE: {
                return Optional.empty();
            }
        }
        throw new IllegalArgumentException("Unable to get payload. Unmanaged event operation: " + operation);
    }

    public void close() {
    }
}

