package com.mulesoft.connector.snowflake.internal.source;

import com.mulesoft.connector.snowflake.api.params.SnowflakeQuerySettings;
import com.mulesoft.connector.snowflake.api.query.Joiner;
import com.mulesoft.connector.snowflake.internal.connection.SnowflakeConnection;
import com.mulesoft.connector.snowflake.internal.metadata.RowListenerMetadataResolver;
import com.mulesoft.connector.snowflake.internal.metadata.TableKeyMetadataResolver;
import com.mulesoft.connector.snowflake.internal.param.RowListenerParameterGroup;
import com.mulesoft.connector.snowflake.internal.service.RowListenerService;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import org.mule.db.commons.AbstractDbConnector;
import org.mule.db.commons.internal.domain.connection.DbConnection;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.metadata.MetadataScope;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.DefaultEncoding;
import org.mule.runtime.extension.api.annotation.param.NullSafe;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.PollContext;
import org.mule.runtime.extension.api.runtime.source.PollingSource;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Summary("Returns all rows of a table based on a watermark")
@DisplayName("On Table Row")
@MetadataScope(outputResolver = RowListenerMetadataResolver.class, keysResolver = TableKeyMetadataResolver.class)
@Alias("row-listener")
/* loaded from: input_file:com/mulesoft/connector/snowflake/internal/source/RowListenerSource.class */
public class RowListenerSource extends PollingSource<Map<String, Object>, Void> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RowListenerSource.class);

    @DefaultEncoding
    private String encoding;

    @ParameterGroup(name = "Row Listener Parameters")
    private RowListenerParameterGroup rowListenerParameterGroup;

    @NullSafe
    @ParameterGroup(name = "Advanced")
    private SnowflakeQuerySettings settings;

    @Config
    private AbstractDbConnector config;

    @Connection
    private ConnectionProvider<SnowflakeConnection> connectionProvider;

    protected void doStart() {
    }

    public void poll(PollContext<Map<String, Object>, Void> pollContext) {
        DbConnection dbConnection = null;
        try {
            try {
                dbConnection = (SnowflakeConnection) this.connectionProvider.connect();
                new RowListenerService(this.config, dbConnection, this.settings, this.encoding).findNewRows(pollContext, this.rowListenerParameterGroup.getTable(), this.rowListenerParameterGroup.getWatermarkColumn(), this.rowListenerParameterGroup.getSince()).forEach(map -> {
                    processRow(pollContext, map);
                });
                if (dbConnection != null) {
                    this.connectionProvider.disconnect(dbConnection);
                }
            } catch (ConnectionException e) {
                pollContext.onConnectionException(e);
                if (dbConnection != null) {
                    this.connectionProvider.disconnect(dbConnection);
                }
            }
        } catch (Throwable th) {
            if (dbConnection != null) {
                this.connectionProvider.disconnect(dbConnection);
            }
            throw th;
        }
    }

    protected void doStop() {
    }

    public void onRejectedItem(Result<Map<String, Object>, Void> result, SourceCallbackContext sourceCallbackContext) {
        LOGGER.debug("Row has been rejected for processing: {}", result.getOutput());
    }

    private void processRow(PollContext<Map<String, Object>, Void> pollContext, Map<String, Object> map) {
        pollContext.accept(pollItem -> {
            pollItem.setResult(Result.builder().output(map).build()).setWatermark(getWatermark(map));
            if (this.rowListenerParameterGroup.getIdColumn() != null) {
                pollItem.setId(getId(map));
            }
        });
    }

    private String getId(Map<String, Object> map) {
        return Optional.ofNullable(map.get(this.rowListenerParameterGroup.getIdColumn())).orElse(Joiner.Constants.EMPTY).toString();
    }

    private Serializable getWatermark(Map<String, Object> map) {
        Object obj = map.get(this.rowListenerParameterGroup.getWatermarkColumn());
        if (!(obj instanceof Serializable)) {
            LOGGER.error("Watermark values need to be serializable, but a value of type {} was found instead for row {}", obj.getClass().getName(), map);
        }
        return (Serializable) obj;
    }
}
