/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source;

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.ConnectTableChangeSerializer;
import io.debezium.relational.history.TableChanges;
import io.debezium.util.SchemaNameAdjuster;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.PostgresDialect;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.PostgresSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffsetFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;

public class PostgresIncrementalSource<T>
extends IncrementalSource<T, JdbcSourceConfig>
implements SupportParallelism {
    static final String IDENTIFIER = "Postgres-CDC";

    public PostgresIncrementalSource(ReadonlyConfig options, List<CatalogTable> catalogTables) {
        super(options, catalogTables);
    }

    public String getPluginName() {
        return IDENTIFIER;
    }

    @Override
    public Option<StartupMode> getStartupModeOption() {
        return PostgresSourceOptions.STARTUP_MODE;
    }

    @Override
    public Option<StopMode> getStopModeOption() {
        return PostgresSourceOptions.STOP_MODE;
    }

    @Override
    public SourceConfig.Factory<JdbcSourceConfig> createSourceConfigFactory(ReadonlyConfig config) {
        PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory();
        configFactory.fromReadonlyConfig(this.readonlyConfig);
        JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo((String)((String)config.get(JdbcCatalogOptions.BASE_URL)));
        configFactory.originUrl(urlInfo.getOrigin());
        configFactory.hostname(urlInfo.getHost());
        configFactory.port(urlInfo.getPort());
        configFactory.startupOptions(this.startupConfig);
        configFactory.stopOptions(this.stopConfig);
        return configFactory;
    }

    @Override
    public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(ReadonlyConfig config) {
        String zoneId = (String)config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
        return SeaTunnelRowDebeziumDeserializeSchema.builder().setTables(this.catalogTables).setServerTimeZone(ZoneId.of(zoneId)).build();
    }

    @Override
    public DataSourceDialect<JdbcSourceConfig> createDataSourceDialect(ReadonlyConfig config) {
        return new PostgresDialect((PostgresSourceConfigFactory)this.configFactory, this.catalogTables);
    }

    @Override
    public OffsetFactory createOffsetFactory(ReadonlyConfig config) {
        return new LsnOffsetFactory((PostgresSourceConfigFactory)this.configFactory, (PostgresDialect)this.dataSourceDialect);
    }

    private Map<TableId, Struct> tableChanges() {
        Map<TableId, Struct> map;
        block8: {
            JdbcSourceConfig jdbcSourceConfig = (JdbcSourceConfig)this.configFactory.create(0);
            PostgresDialect dialect = new PostgresDialect((PostgresSourceConfigFactory)this.configFactory, this.catalogTables);
            List<TableId> discoverTables = dialect.discoverDataCollections(jdbcSourceConfig);
            SchemaNameAdjuster adjuster = SchemaNameAdjuster.create();
            ConnectTableChangeSerializer connectTableChangeSerializer = new ConnectTableChangeSerializer(adjuster);
            JdbcConnection jdbcConnection = dialect.openJdbcConnection(jdbcSourceConfig);
            try {
                map = discoverTables.stream().collect(Collectors.toMap(Function.identity(), tableId -> {
                    TableChanges tableChanges = new TableChanges();
                    tableChanges.create(dialect.queryTableSchema(jdbcConnection, (TableId)tableId).getTable());
                    return (Struct)connectTableChangeSerializer.serialize(tableChanges).get(0);
                }));
                if (jdbcConnection == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (jdbcConnection != null) {
                        try {
                            jdbcConnection.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e) {
                    throw new SeaTunnelException((Throwable)e);
                }
            }
            jdbcConnection.close();
        }
        return map;
    }
}

