/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.jdbc.source;

import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.ChunkSplitter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ObjectUtils;
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamicChunkSplitter
extends ChunkSplitter {
    private static final Logger log = LoggerFactory.getLogger(DynamicChunkSplitter.class);

    public DynamicChunkSplitter(JdbcSourceConfig config) {
        super(config);
    }

    @Override
    protected Collection<JdbcSourceSplit> createSplits(JdbcSourceTable table, SeaTunnelRowType splitKey) throws Exception {
        return this.createDynamicSplits(table, splitKey);
    }

    @Override
    protected PreparedStatement createSplitStatement(JdbcSourceSplit split, TableSchema schema) throws SQLException {
        return this.createDynamicSplitStatement(split, schema);
    }

    private Collection<JdbcSourceSplit> createDynamicSplits(JdbcSourceTable table, SeaTunnelRowType splitKey) throws Exception {
        String splitKeyName = splitKey.getFieldNames()[0];
        SeaTunnelDataType splitKeyType = splitKey.getFieldType(0);
        List<ChunkRange> chunks = this.splitTableIntoChunks(table, splitKeyName, splitKeyType);
        ArrayList<JdbcSourceSplit> splits = new ArrayList<JdbcSourceSplit>();
        for (int i = 0; i < chunks.size(); ++i) {
            ChunkRange chunk = chunks.get(i);
            JdbcSourceSplit split = new JdbcSourceSplit(table.getTablePath(), this.createSplitId(table.getTablePath(), i), table.getQuery(), splitKeyName, splitKeyType, chunk.getChunkStart(), chunk.getChunkEnd());
            splits.add(split);
        }
        return splits;
    }

    private PreparedStatement createDynamicSplitStatement(JdbcSourceSplit split, TableSchema schema) throws SQLException {
        String splitQuery = this.createDynamicSplitQuerySQL(split, schema);
        PreparedStatement statement = this.createPreparedStatement(splitQuery);
        DynamicChunkSplitter.prepareDynamicSplitStatement(statement, split);
        return statement;
    }

    private List<ChunkRange> splitTableIntoChunks(JdbcSourceTable table, String splitColumnName, SeaTunnelDataType splitColumnType) throws Exception {
        Pair<Object, Object> minMax = this.queryMinMax(table, splitColumnName);
        Object min2 = minMax.getLeft();
        Object max = minMax.getRight();
        if (min2 == null || max == null || min2.equals(max)) {
            return Collections.singletonList(ChunkRange.all());
        }
        int chunkSize = this.config.getSplitSize();
        switch (splitColumnType.getSqlType()) {
            case TINYINT: 
            case SMALLINT: 
            case INT: 
            case BIGINT: 
            case DECIMAL: 
            case DOUBLE: 
            case FLOAT: 
            case STRING: {
                return this.evenlyColumnSplitChunks(table, splitColumnName, min2, max, chunkSize);
            }
            case DATE: {
                return this.dateColumnSplitChunks(table, splitColumnName, min2, max, chunkSize);
            }
        }
        throw CommonError.unsupportedDataType((String)"JDBC", (String)splitColumnType.getSqlType().toString(), (String)splitColumnName);
    }

    private List<ChunkRange> evenlyColumnSplitChunks(JdbcSourceTable table, String splitColumnName, Object min2, Object max, int chunkSize) throws Exception {
        boolean dataIsEvenlyDistributed;
        TablePath tablePath = table.getTablePath();
        double distributionFactorUpper = this.config.getSplitEvenDistributionFactorUpperBound();
        double distributionFactorLower = this.config.getSplitEvenDistributionFactorLowerBound();
        int sampleShardingThreshold = this.config.getSplitSampleShardingThreshold();
        log.info("Splitting table {} into chunks, split column: {}, min: {}, max: {}, chunk size: {}, distribution factor upper: {}, distribution factor lower: {}, sample sharding threshold: {}", new Object[]{tablePath, splitColumnName, min2, max, chunkSize, distributionFactorUpper, distributionFactorLower, sampleShardingThreshold});
        long approximateRowCnt = this.queryApproximateRowCnt(table);
        double distributionFactor = this.calculateDistributionFactor(tablePath, min2, max, approximateRowCnt);
        boolean bl = dataIsEvenlyDistributed = ObjectUtils.doubleCompare(distributionFactor, distributionFactorLower) >= 0 && ObjectUtils.doubleCompare(distributionFactor, distributionFactorUpper) <= 0;
        if (dataIsEvenlyDistributed) {
            int dynamicChunkSize = Math.max((int)(distributionFactor * (double)chunkSize), 1);
            return this.splitEvenlySizedChunks(tablePath, min2, max, approximateRowCnt, chunkSize, dynamicChunkSize);
        }
        int shardCount = (int)(approximateRowCnt / (long)chunkSize);
        int inverseSamplingRate = this.config.getSplitInverseSamplingRate();
        if (sampleShardingThreshold < shardCount) {
            if (inverseSamplingRate > chunkSize) {
                log.warn("The inverseSamplingRate is {}, which is greater than chunkSize {}, so we set inverseSamplingRate to chunkSize", (Object)inverseSamplingRate, (Object)chunkSize);
                inverseSamplingRate = chunkSize;
            }
            log.info("Use sampling sharding for table {}, the sampling rate is {}", (Object)tablePath, (Object)inverseSamplingRate);
            Object[] sample = this.jdbcDialect.sampleDataFromColumn(this.getOrEstablishConnection(), table, splitColumnName, inverseSamplingRate, this.config.getFetchSize());
            log.info("Sample data from table {} end, the sample size is {}", (Object)tablePath, (Object)sample.length);
            return DynamicChunkSplitter.efficientShardingThroughSampling(tablePath, sample, approximateRowCnt, shardCount);
        }
        return this.splitUnevenlySizedChunks(table, splitColumnName, min2, max, chunkSize);
    }

    private Long queryApproximateRowCnt(JdbcSourceTable table) throws SQLException {
        return this.jdbcDialect.approximateRowCntStatement(this.getOrEstablishConnection(), table);
    }

    private double calculateDistributionFactor(TablePath tablePath, Object min2, Object max, long approximateRowCnt) {
        if (!min2.getClass().equals(max.getClass())) {
            throw new IllegalStateException(String.format("Unsupported operation type, the MIN value type %s is different with MAX value type %s.", min2.getClass().getSimpleName(), max.getClass().getSimpleName()));
        }
        if (approximateRowCnt == 0L) {
            return Double.MAX_VALUE;
        }
        BigDecimal difference = ObjectUtils.minus(max, min2);
        BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1L));
        double distributionFactor = subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, 2).doubleValue();
        log.info("The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", new Object[]{tablePath, distributionFactor, min2, max, approximateRowCnt});
        return distributionFactor;
    }

    private List<ChunkRange> splitEvenlySizedChunks(TablePath tablePath, Object min2, Object max, long approximateRowCnt, int chunkSize, int dynamicChunkSize) {
        log.info("Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", new Object[]{tablePath, approximateRowCnt, chunkSize, dynamicChunkSize});
        if (approximateRowCnt <= (long)chunkSize) {
            return Collections.singletonList(ChunkRange.all());
        }
        ArrayList<ChunkRange> splits = new ArrayList<ChunkRange>();
        Object chunkStart = null;
        Object chunkEnd = ObjectUtils.plus(min2, dynamicChunkSize);
        while (ObjectUtils.compare(chunkEnd, max) <= 0) {
            splits.add(ChunkRange.of(chunkStart, chunkEnd));
            chunkStart = chunkEnd;
            try {
                chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
            }
            catch (ArithmeticException e) {
                // empty catch block
                break;
            }
        }
        splits.add(ChunkRange.of(chunkStart, null));
        return splits;
    }

    public static List<ChunkRange> efficientShardingThroughSampling(TablePath tablePath, Object[] sampleData, long approximateRowCnt, int shardCount) {
        log.info("Use efficient sharding through sampling optimization for table {}, the approximate row count is {}, the shardCount is {}", new Object[]{tablePath, approximateRowCnt, shardCount});
        ArrayList<ChunkRange> splits = new ArrayList<ChunkRange>();
        if (shardCount == 0) {
            splits.add(ChunkRange.of(null, null));
            return splits;
        }
        double approxSamplePerShard = (double)sampleData.length / (double)shardCount;
        Object lastEnd = null;
        if (approxSamplePerShard <= 1.0) {
            splits.add(ChunkRange.of(null, sampleData[0]));
            lastEnd = sampleData[0];
            for (int i = 1; i < sampleData.length; ++i) {
                if (sampleData[i].equals(lastEnd)) continue;
                splits.add(ChunkRange.of(lastEnd, sampleData[i]));
                lastEnd = sampleData[i];
            }
            splits.add(ChunkRange.of(lastEnd, null));
        } else {
            for (int i = 0; i < shardCount; ++i) {
                Object chunkEnd;
                Object chunkStart = lastEnd;
                Object object = chunkEnd = i < shardCount - 1 ? sampleData[(int)((double)(i + 1) * approxSamplePerShard)] : null;
                if (i != 0 && i != shardCount - 1 && Objects.equals(chunkEnd, chunkStart)) continue;
                splits.add(ChunkRange.of(chunkStart, chunkEnd));
                lastEnd = chunkEnd;
            }
        }
        return splits;
    }

    private List<ChunkRange> splitUnevenlySizedChunks(JdbcSourceTable table, String splitColumnName, Object min2, Object max, int chunkSize) throws SQLException {
        log.info("Use unevenly-sized chunks for table {}, the chunk size is {}", (Object)table.getTablePath(), (Object)chunkSize);
        ArrayList<ChunkRange> splits = new ArrayList<ChunkRange>();
        Object chunkStart = null;
        Object chunkEnd = this.nextChunkEnd(min2, table, splitColumnName, max, chunkSize);
        int count = 0;
        while (chunkEnd != null && this.objectCompare(chunkEnd, max) <= 0) {
            splits.add(ChunkRange.of(chunkStart, chunkEnd));
            DynamicChunkSplitter.maySleep(count++, table.getTablePath());
            chunkStart = chunkEnd;
            chunkEnd = this.nextChunkEnd(chunkEnd, table, splitColumnName, max, chunkSize);
        }
        splits.add(ChunkRange.of(chunkStart, null));
        return splits;
    }

    private List<ChunkRange> dateColumnSplitChunks(JdbcSourceTable table, String splitColumnName, Object min2, Object max, int chunkSize) throws SQLException {
        log.info("Use date chunks for table {}", (Object)table.getTablePath());
        ArrayList<ChunkRange> splits = new ArrayList<ChunkRange>();
        Date sqlDateMin = null;
        Date sqlDateMax = null;
        if (min2 instanceof Date) {
            sqlDateMin = (Date)min2;
            sqlDateMax = (Date)max;
        } else if (min2 instanceof Timestamp) {
            sqlDateMin = new Date(((Timestamp)min2).getTime());
            sqlDateMax = new Date(((Timestamp)max).getTime());
        }
        List<LocalDate> dateRange = DynamicChunkSplitter.getDateRange(sqlDateMin.toLocalDate(), sqlDateMax.toLocalDate());
        if (dateRange.size() > 7300) {
            // empty if block
        }
        Long rowCnt = this.queryApproximateRowCnt(table);
        int step = 1;
        if (rowCnt / (long)dateRange.size() < (long)chunkSize) {
            int splitNum = (int)(rowCnt / (long)chunkSize) + 1;
            step = dateRange.size() / splitNum;
        }
        for (int i = 0; i < dateRange.size(); i += step) {
            if (i == 0) {
                splits.add(ChunkRange.of(null, dateRange.get(i)));
            } else {
                splits.add(ChunkRange.of(dateRange.get(i - step), dateRange.get(i)));
            }
            if (i + step < dateRange.size()) continue;
            splits.add(ChunkRange.of(dateRange.get(i), null));
        }
        return splits;
    }

    private static List<LocalDate> getDateRange(LocalDate startDate, LocalDate endDate) {
        ArrayList<LocalDate> dateRange = new ArrayList<LocalDate>();
        LocalDate currentDate = startDate;
        while (!currentDate.isAfter(endDate)) {
            dateRange.add(currentDate);
            currentDate = currentDate.plusDays(1L);
        }
        return dateRange;
    }

    private Object nextChunkEnd(Object previousChunkEnd, JdbcSourceTable table, String splitColumnName, Object max, int chunkSize) throws SQLException {
        Object chunkEnd = this.jdbcDialect.queryNextChunkMax(this.getOrEstablishConnection(), table, splitColumnName, chunkSize, previousChunkEnd);
        if (Objects.equals(previousChunkEnd, chunkEnd)) {
            chunkEnd = this.queryMin(table, splitColumnName, chunkEnd);
        }
        if (this.objectCompare(chunkEnd, max) >= 0) {
            return null;
        }
        return chunkEnd;
    }

    private static void maySleep(int count, TablePath tablePath) {
        if (count % 10 == 0) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            log.info("DynamicChunkSplitter has split {} chunks for table {}", (Object)count, (Object)tablePath);
        }
    }

    private int objectCompare(Object obj1, Object obj2) {
        return ObjectUtils.compare(obj1, obj2);
    }

    @VisibleForTesting
    String createDynamicSplitQuerySQL(JdbcSourceSplit split, TableSchema schema) {
        StringBuilder sql;
        String condition;
        boolean isLastSplit;
        SeaTunnelRowType rowType = new SeaTunnelRowType(new String[]{split.getSplitKeyName()}, new SeaTunnelDataType[]{split.getSplitKeyType()});
        boolean isFirstSplit = split.getSplitStart() == null;
        boolean bl = isLastSplit = split.getSplitEnd() == null;
        if (isFirstSplit && isLastSplit) {
            condition = null;
        } else if (isFirstSplit) {
            sql = new StringBuilder();
            this.addKeyColumnsToCondition(schema, rowType, sql, " <= ?");
            sql.append(" AND NOT (");
            this.addKeyColumnsToCondition(schema, rowType, sql, " = ?");
            sql.append(")");
            condition = sql.toString();
        } else if (isLastSplit) {
            sql = new StringBuilder();
            this.addKeyColumnsToCondition(schema, rowType, sql, " >= ?");
            condition = sql.toString();
        } else {
            sql = new StringBuilder();
            this.addKeyColumnsToCondition(schema, rowType, sql, " >= ?");
            sql.append(" AND NOT (");
            this.addKeyColumnsToCondition(schema, rowType, sql, " = ?");
            sql.append(")");
            sql.append(" AND ");
            this.addKeyColumnsToCondition(schema, rowType, sql, " <= ?");
            condition = sql.toString();
        }
        String splitQuery = split.getSplitQuery();
        splitQuery = StringUtils.isNotBlank(splitQuery) ? String.format("SELECT * FROM (%s) tmp", splitQuery) : String.format("SELECT * FROM %s", this.jdbcDialect.tableIdentifier(split.getTablePath()));
        StringBuilder sql2 = new StringBuilder();
        sql2.append(splitQuery);
        if (!StringUtils.isEmpty(condition)) {
            sql2.append(" WHERE ").append(condition);
        }
        return sql2.toString();
    }

    private void addKeyColumnsToCondition(TableSchema schema, SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
        Map<String, Column> columns = schema.getColumns().stream().collect(Collectors.toMap(c -> c.getName(), c -> c));
        for (int i = 0; i < rowType.getTotalFields(); ++i) {
            String fieldName = this.jdbcDialect.quoteIdentifier(rowType.getFieldName(i));
            fieldName = this.jdbcDialect.convertType(fieldName, columns.get(rowType.getFieldName(i)).getSourceType());
            sql.append(fieldName).append(predicate);
            if (i >= rowType.getTotalFields() - 1) continue;
            sql.append(" AND ");
        }
    }

    private static void prepareDynamicSplitStatement(PreparedStatement statement, JdbcSourceSplit split) throws SQLException {
        boolean isLastSplit;
        boolean isFirstSplit = split.getSplitStart() == null;
        boolean bl = isLastSplit = split.getSplitEnd() == null;
        if (isFirstSplit && isLastSplit) {
            return;
        }
        Object[] splitStart = new Object[]{split.getSplitStart()};
        Object[] splitEnd = new Object[]{split.getSplitEnd()};
        int splitKeyNumbers = 1;
        if (isFirstSplit) {
            for (int i = 0; i < splitKeyNumbers; ++i) {
                statement.setObject(i + 1, splitEnd[i]);
                statement.setObject(i + 1 + splitKeyNumbers, splitEnd[i]);
            }
        } else if (isLastSplit) {
            for (int i = 0; i < splitKeyNumbers; ++i) {
                statement.setObject(i + 1, splitStart[i]);
            }
        } else {
            for (int i = 0; i < splitKeyNumbers; ++i) {
                statement.setObject(i + 1, splitStart[i]);
                statement.setObject(i + 1 + splitKeyNumbers, splitEnd[i]);
                statement.setObject(i + 1 + 2 * splitKeyNumbers, splitEnd[i]);
            }
        }
    }

    public static class ChunkRange
    implements Serializable {
        private final Object chunkStart;
        private final Object chunkEnd;

        public static ChunkRange all() {
            return new ChunkRange(null, null);
        }

        public static ChunkRange of(Object chunkStart, Object chunkEnd) {
            return new ChunkRange(chunkStart, chunkEnd);
        }

        private ChunkRange(Object chunkStart, Object chunkEnd) {
            if (chunkStart != null || chunkEnd != null) {
                Preconditions.checkArgument((!Objects.equals(chunkStart, chunkEnd) ? 1 : 0) != 0, (String)"Chunk start %s shouldn't be equal to chunk end %s", (Object)chunkStart, (Object)chunkEnd);
            }
            this.chunkStart = chunkStart;
            this.chunkEnd = chunkEnd;
        }

        public Object getChunkStart() {
            return this.chunkStart;
        }

        public Object getChunkEnd() {
            return this.chunkEnd;
        }

        public String toString() {
            return "DynamicChunkSplitter.ChunkRange(chunkStart=" + this.getChunkStart() + ", chunkEnd=" + this.getChunkEnd() + ")";
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof ChunkRange)) {
                return false;
            }
            ChunkRange other = (ChunkRange)o;
            if (!other.canEqual(this)) {
                return false;
            }
            Object this$chunkStart = this.getChunkStart();
            Object other$chunkStart = other.getChunkStart();
            if (this$chunkStart == null ? other$chunkStart != null : !this$chunkStart.equals(other$chunkStart)) {
                return false;
            }
            Object this$chunkEnd = this.getChunkEnd();
            Object other$chunkEnd = other.getChunkEnd();
            return !(this$chunkEnd == null ? other$chunkEnd != null : !this$chunkEnd.equals(other$chunkEnd));
        }

        protected boolean canEqual(Object other) {
            return other instanceof ChunkRange;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Object $chunkStart = this.getChunkStart();
            result = result * 59 + ($chunkStart == null ? 43 : $chunkStart.hashCode());
            Object $chunkEnd = this.getChunkEnd();
            result = result * 59 + ($chunkEnd == null ? 43 : $chunkEnd.hashCode());
            return result;
        }
    }
}

