/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.data.rx.sql.impl;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import io.r2dbc.spi.Statement;
import java.util.Collection;
import java.util.function.Supplier;
import org.noear.solon.data.rx.sql.RxSqlCommand;
import org.noear.solon.data.rx.sql.RxSqlConfiguration;
import org.noear.solon.data.rx.sql.RxSqlQuerier;
import org.noear.solon.data.rx.sql.bound.RxRowConverter;
import org.noear.solon.data.rx.sql.bound.RxStatementBinder;
import org.noear.solon.data.rx.sql.impl.DefaultRxBinder;
import org.noear.solon.data.rx.sql.impl.DefaultRxConverter;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class SimpleRxSqlQuerier
implements RxSqlQuerier {
    private static final DefaultRxBinder DEFAULT_BINDER = new DefaultRxBinder();
    private final ConnectionFactory dataSource;
    private final String commandText;
    private RxSqlCommand command;

    public SimpleRxSqlQuerier(ConnectionFactory dataSource, String sql) {
        this.dataSource = dataSource;
        this.commandText = sql;
    }

    @Override
    public RxSqlQuerier params(Object ... args) {
        this.command = new RxSqlCommand<Object[]>(this.dataSource, this.commandText, args, DEFAULT_BINDER);
        return this;
    }

    @Override
    public <S> RxSqlQuerier params(S args, RxStatementBinder<S> binder) {
        this.command = new RxSqlCommand<S>(this.dataSource, this.commandText, args, binder);
        return this;
    }

    @Override
    public RxSqlQuerier params(Collection<Object[]> argsList) {
        this.command = new RxSqlCommand<Object[]>(this.dataSource, this.commandText, argsList, DEFAULT_BINDER);
        return this;
    }

    @Override
    public <S> RxSqlQuerier params(Collection<S> argsList, Supplier<RxStatementBinder<S>> binderSupplier) {
        this.command = new RxSqlCommand<S>(this.dataSource, this.commandText, argsList, binderSupplier.get());
        return this;
    }

    @Override
    public <T> Mono<T> queryValue(Class<T> tClass) {
        return this.queryRow((Row row, RowMetadata rowM) -> row.get(0));
    }

    @Override
    public <T> Flux<T> queryValueList(Class<T> tClass) {
        return this.queryRowList((Row row, RowMetadata rowM) -> row.get(0));
    }

    @Override
    public <T> Mono<T> queryRow(Class<T> tClass) {
        return this.queryRow(DefaultRxConverter.getInstance().create(tClass));
    }

    @Override
    public <T> Mono<T> queryRow(RxRowConverter<T> converter) {
        return Mono.from((Publisher)RxSqlConfiguration.doIntercept(this.command, () -> this.queryRowDo(converter)));
    }

    protected <T> Mono<T> queryRowDo(RxRowConverter<T> converter) {
        return Mono.from(this.getConnection()).flatMapMany(conn -> this.buildStatement((Connection)conn, this.command, false)).flatMap(result -> result.map(converter::convert)).take(1L).singleOrEmpty();
    }

    @Override
    public <T> Flux<T> queryRowList(Class<T> tClass) {
        return this.queryRowList(DefaultRxConverter.getInstance().create(tClass));
    }

    @Override
    public <T> Flux<T> queryRowList(RxRowConverter<T> converter) {
        return Flux.from((Publisher)RxSqlConfiguration.doIntercept(this.command, () -> this.queryRowListDo(converter)));
    }

    protected <T> Flux<T> queryRowListDo(RxRowConverter<T> converter) {
        return Mono.from(this.getConnection()).flatMapMany(conn -> this.buildStatement((Connection)conn, this.command, false)).flatMap(result -> result.map(converter::convert));
    }

    @Override
    public Mono<Long> update() {
        return Mono.from((Publisher)RxSqlConfiguration.doIntercept(this.command, this::updateDo));
    }

    protected Mono<Long> updateDo() {
        return Mono.from(this.getConnection()).flatMapMany(conn -> this.buildStatement((Connection)conn, this.command, false)).flatMap(result -> result.getRowsUpdated()).take(1L).singleOrEmpty();
    }

    @Override
    public <T> Mono<T> updateReturnKey(Class<T> tClass) {
        return Mono.from((Publisher)RxSqlConfiguration.doIntercept(this.command, this::updateReturnKeyDo));
    }

    protected Mono updateReturnKeyDo() {
        return Mono.from(this.getConnection()).flatMapMany(conn -> this.buildStatement((Connection)conn, this.command, true)).flatMap(result -> result.map(r -> r.get(0))).take(1L).singleOrEmpty();
    }

    @Override
    public Flux<Long> updateBatch() {
        return Flux.from((Publisher)RxSqlConfiguration.doIntercept(this.command, this::updateBatchDo));
    }

    protected Flux<Long> updateBatchDo() {
        return Mono.from(this.getConnection()).flatMapMany(conn -> this.buildStatement((Connection)conn, this.command, false)).flatMap(result -> result.getRowsUpdated());
    }

    protected Publisher<? extends Result> buildStatement(Connection conn, RxSqlCommand command, boolean returnKeys) {
        Statement stmt = conn.createStatement(command.getSql());
        if (returnKeys) {
            stmt.returnGeneratedValues(new String[0]);
        }
        command.fill(stmt);
        return stmt.execute();
    }

    protected Publisher<? extends Connection> getConnection() {
        return this.dataSource.create();
    }
}

