/*
 * Decompiled with CFR 0.152.
 */
package org.mule.extension.db.internal.source;

import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import org.mule.db.commons.shaded.AbstractDbConnector;
import org.mule.db.commons.shaded.api.param.ParameterizedStatementDefinition;
import org.mule.db.commons.shaded.internal.domain.connection.DbConnection;
import org.mule.db.commons.shaded.internal.domain.executor.SelectExecutor;
import org.mule.db.commons.shaded.internal.domain.query.Query;
import org.mule.db.commons.shaded.internal.domain.statement.QueryStatementFactory;
import org.mule.db.commons.shaded.internal.resolver.query.ParameterizedQueryResolver;
import org.mule.db.commons.shaded.internal.resolver.query.QueryResolver;
import org.mule.db.commons.shaded.internal.result.resultset.ListResultSetHandler;
import org.mule.db.commons.shaded.internal.result.row.NonStreamingInsensitiveMapRowHandler;
import org.mule.extension.db.api.param.QueryDefinition;
import org.mule.extension.db.api.param.QuerySettings;
import org.mule.extension.db.internal.source.ColumnValueProvider;
import org.mule.extension.db.internal.source.RowListenerMetadataResolver;
import org.mule.extension.db.internal.source.TableKeyResolver;
import org.mule.extension.db.internal.util.MigrationUtils;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.metadata.MetadataKeyId;
import org.mule.runtime.extension.api.annotation.metadata.MetadataScope;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.DefaultEncoding;
import org.mule.runtime.extension.api.annotation.param.NullSafe;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.annotation.values.OfValues;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.PollContext;
import org.mule.runtime.extension.api.runtime.source.PollingSource;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@MetadataScope(outputResolver=RowListenerMetadataResolver.class, keysResolver=TableKeyResolver.class)
@DisplayName(value="On Table Row")
@Summary(value="Triggers a message per each row in a table")
@Alias(value="listener")
public class RowListener
extends PollingSource<Map<String, Object>, Void> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RowListener.class);
    public static final String WATERMARK_PARAM_NAME = "watermark";
    @DefaultEncoding
    String encoding;
    @Parameter
    @MetadataKeyId
    private String table;
    @Parameter
    @Optional
    @OfValues(value=ColumnValueProvider.class)
    @Summary(value="The name of the column used for watermark")
    private String watermarkColumn;
    @Parameter
    @Optional
    @Summary(value="The name of the column to consider as row ID")
    @OfValues(value=ColumnValueProvider.class)
    private String idColumn;
    @ParameterGroup(name="Advanced")
    @NullSafe
    private QuerySettings settings;
    @Config
    private AbstractDbConnector config;
    @Connection
    private ConnectionProvider<DbConnection> connectionProvider;
    private final QueryResolver<ParameterizedStatementDefinition> queryResolver = new ParameterizedQueryResolver<ParameterizedStatementDefinition>();
    private ItemHandler idHandler;
    private ItemHandler watermarkHandler;
    private Charset charset;

    protected void doStart() throws MuleException {
        this.idHandler = this.idColumn != null ? (item, row) -> {
            Object id = row.get(this.idColumn);
            if (id != null) {
                item.setId(id.toString());
            } else if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("A null ID value was obtained for row %s. Idempotency will not be enforced for this row", row));
            }
        } : new NullItemHandler();
        this.watermarkHandler = this.watermarkColumn != null ? (item, row) -> {
            Object watermark = row.get(this.watermarkColumn);
            if (watermark == null) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("A null watermark value was obtained for row %s. Watermark value won't be updated for this row", row));
                }
                return;
            }
            if (!(watermark instanceof Serializable)) {
                LOGGER.error(String.format("Watermark values need to be serializable, but a value of type %s was found instead for row %s", watermark.getClass().getName(), row));
            }
            item.setWatermark((Serializable)watermark);
        } : new NullItemHandler();
    }

    protected void doStop() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void poll(PollContext<Map<String, Object>, Void> pollContext) {
        DbConnection connection;
        if (pollContext.isSourceStopping()) {
            return;
        }
        try {
            connection = (DbConnection)this.connectionProvider.connect();
        }
        catch (Exception e) {
            if (e instanceof ConnectionException) {
                pollContext.onConnectionException((ConnectionException)e);
            }
            LOGGER.error(String.format("Could not obtain connection while trying to poll table '%s'. %s", this.table, e.getMessage()), (Throwable)e);
            return;
        }
        try {
            QueryDefinition queryDefinition = new QueryDefinition();
            StringBuilder sql = new StringBuilder("SELECT * FROM ").append(this.table);
            pollContext.getWatermark().ifPresent(w -> {
                sql.append(" WHERE ").append(this.watermarkColumn).append(" > :").append(WATERMARK_PARAM_NAME);
                queryDefinition.addInputParameter(WATERMARK_PARAM_NAME, w);
            });
            queryDefinition.setSql(sql.toString());
            Query query = this.queryResolver.resolve(MigrationUtils.mapParameterizedStatementDefinition(queryDefinition), this.config, connection, null);
            QueryStatementFactory statementFactory = new QueryStatementFactory();
            statementFactory.setFetchSize(this.settings.getFetchSize() != null ? this.settings.getFetchSize() : 10);
            statementFactory.setQueryTimeout(new Long(this.settings.getQueryTimeoutUnit().toSeconds(this.settings.getQueryTimeout())).intValue());
            if (this.settings.getMaxRows() != null) {
                statementFactory.setMaxRows(this.settings.getMaxRows());
            }
            ListResultSetHandler resultSetHandler = new ListResultSetHandler(new NonStreamingInsensitiveMapRowHandler(connection, Charset.forName(this.encoding)));
            List rows = (List)new SelectExecutor(statementFactory, resultSetHandler).execute(connection, query);
            rows.forEach(row -> pollContext.accept(item -> {
                this.idHandler.accept(item, row);
                this.watermarkHandler.accept(item, row);
                item.setResult(Result.builder().output(row).build());
            }));
        }
        catch (Exception e) {
            LOGGER.error(String.format("Failed to query table '%s' for new rows. %s", this.table, e.getMessage()), (Throwable)e);
        }
        finally {
            this.connectionProvider.disconnect((Object)connection);
        }
    }

    public void onRejectedItem(Result<Map<String, Object>, Void> result, SourceCallbackContext sourceCallbackContext) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Row has been rejected for processing: {}", result.getOutput());
        }
    }

    private final class NullItemHandler
    implements ItemHandler {
        private NullItemHandler() {
        }

        @Override
        public void accept(PollContext.PollItem<Map<String, Object>, Void> mapVoidPollItem, Map<String, Object> stringObjectMap) {
        }
    }

    @FunctionalInterface
    private static interface ItemHandler
    extends BiConsumer<PollContext.PollItem<Map<String, Object>, Void>, Map<String, Object>> {
    }
}

