/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.salesforce;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.math.DoubleMath;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.text.StrSubstitutor;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.salesforce.SalesforceConnector;
import org.apache.gobblin.salesforce.SalesforceExtractor;
import org.apache.gobblin.salesforce.SfConfig;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.exception.ExtractPrepareException;
import org.apache.gobblin.source.extractor.exception.RestApiClientException;
import org.apache.gobblin.source.extractor.exception.RestApiConnectionException;
import org.apache.gobblin.source.extractor.exception.RestApiProcessingException;
import org.apache.gobblin.source.extractor.extract.CommandOutput;
import org.apache.gobblin.source.extractor.extract.QueryBasedSource;
import org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector;
import org.apache.gobblin.source.extractor.partition.Partition;
import org.apache.gobblin.source.extractor.partition.Partitioner;
import org.apache.gobblin.source.extractor.utils.Utils;
import org.apache.gobblin.source.extractor.watermark.Predicate;
import org.apache.gobblin.source.extractor.watermark.WatermarkType;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SalesforceSource
extends QueryBasedSource<JsonArray, JsonElement> {
    private static final Logger log = LoggerFactory.getLogger(SalesforceSource.class);
    public static final String USE_ALL_OBJECTS = "use.all.objects";
    public static final boolean DEFAULT_USE_ALL_OBJECTS = false;
    private static final String ENABLE_DYNAMIC_PROBING = "salesforce.enableDynamicProbing";
    private static final String DYNAMIC_PROBING_LIMIT = "salesforce.dynamicProbingLimit";
    private static final int DEFAULT_DYNAMIC_PROBING_LIMIT = 1000;
    private static final String MIN_TARGET_PARTITION_SIZE = "salesforce.minTargetPartitionSize";
    private static final int DEFAULT_MIN_TARGET_PARTITION_SIZE = 250000;
    private static final String PROBE_TARGET_RATIO = "salesforce.probeTargetRatio";
    private static final double DEFAULT_PROBE_TARGET_RATIO = 0.6;
    private static final int MIN_SPLIT_TIME_MILLIS = 1000;
    private static final String DAY_PARTITION_QUERY_TEMPLATE = "SELECT count(${column}) cnt, DAY_ONLY(${column}) time FROM ${table} WHERE ${column} ${greater} ${start} AND ${column} ${less} ${end} GROUP BY DAY_ONLY(${column}) ORDER BY DAY_ONLY(${column})";
    private static final String PROBE_PARTITION_QUERY_TEMPLATE = "SELECT count(${column}) cnt FROM ${table} WHERE ${column} ${greater} ${start} AND ${column} ${less} ${end}";
    private static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning";
    private static final String EARLY_STOP_TOTAL_RECORDS_LIMIT = "salesforce.earlyStopTotalRecordsLimit";
    private static final long DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT = 1000000L;
    private static final String SECONDS_FORMAT = "yyyy-MM-dd-HH:mm:ss";
    private static final String ZERO_TIME_SUFFIX = "-00:00:00";
    private static final Gson GSON = new Gson();
    private boolean isEarlyStopped = false;
    protected SalesforceConnector salesforceConnector = null;
    private SfConfig workUnitConf;

    public SalesforceSource() {
        this.lineageInfo = Optional.absent();
    }

    @VisibleForTesting
    SalesforceSource(LineageInfo lineageInfo) {
        this.lineageInfo = Optional.fromNullable((Object)lineageInfo);
    }

    public Extractor<JsonArray, JsonElement> getExtractor(WorkUnitState state) throws IOException {
        try {
            return new SalesforceExtractor(state).build();
        }
        catch (ExtractPrepareException e) {
            log.error("Failed to prepare extractor", (Throwable)e);
            throw new IOException(e);
        }
    }

    public boolean isEarlyStopped() {
        return this.isEarlyStopped;
    }

    protected void addLineageSourceInfo(SourceState sourceState, QueryBasedSource.SourceEntity entity, WorkUnit workUnit) {
        DatasetDescriptor source = new DatasetDescriptor("salesforce", entity.getSourceEntityName());
        if (this.lineageInfo.isPresent()) {
            ((LineageInfo)this.lineageInfo.get()).setSource((Descriptor)source, (State)workUnit);
        }
    }

    protected List<WorkUnit> generateWorkUnits(QueryBasedSource.SourceEntity sourceEntity, SourceState state, long previousWatermark) {
        List<WorkUnit> workUnits = null;
        this.workUnitConf = new SfConfig(state.getProperties());
        String partitionType = state.getProp("salesforce.partitionType", "");
        workUnits = partitionType.equals("PK_CHUNKING") ? this.generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark) : this.generateWorkUnitsStrategy(sourceEntity, state, previousWatermark);
        log.info("====Generated {} workUnit(s)====", (Object)workUnits.size());
        if (this.workUnitConf.partitionOnly) {
            log.info("It is partitionOnly mode, return blank workUnit list");
            return new ArrayList<WorkUnit>();
        }
        return workUnits;
    }

    private List<WorkUnit> generateWorkUnitsPkChunking(QueryBasedSource.SourceEntity sourceEntity, SourceState state, long previousWatermark) {
        SalesforceExtractor.ResultFileIdsStruct resultFileIdsStruct = this.executeQueryWithPkChunking(state, previousWatermark);
        return this.createWorkUnits(sourceEntity, state, resultFileIdsStruct);
    }

    private SalesforceExtractor.ResultFileIdsStruct executeQueryWithPkChunking(SourceState sourceState, long previousWatermark) throws RuntimeException {
        State state = new State((State)sourceState);
        WorkUnit workUnit = WorkUnit.createEmpty();
        WorkUnitState workUnitState = new WorkUnitState(workUnit, state);
        workUnitState.setId("Execute pk-chunking");
        SalesforceExtractor salesforceExtractor = null;
        try {
            salesforceExtractor = (SalesforceExtractor)this.getExtractor(workUnitState);
            Partitioner partitioner = new Partitioner(sourceState);
            if (this.isEarlyStopEnabled(state) && partitioner.isFullDump()) {
                throw new UnsupportedOperationException("Early stop mode cannot work with full dump mode.");
            }
            Partition partition = partitioner.getGlobalPartition(previousWatermark);
            String condition = "";
            Date startDate = Utils.toDate((long)partition.getLowWatermark(), (String)"yyyyMMddHHmmss");
            String field = sourceState.getProp("extract.delta.fields");
            if (startDate != null && field != null) {
                String lowWatermarkDate = Utils.dateToString((Date)startDate, (String)"yyyy-MM-dd'T'HH:mm:ss'.000Z'");
                condition = field + " >= " + lowWatermarkDate;
            }
            Predicate predicate = new Predicate(null, 0L, condition, "", null);
            List<Predicate> predicateList = Arrays.asList(predicate);
            String entity = sourceState.getProp("source.entity");
            if (state.contains("salesforce.bulk.testJobId")) {
                String jobId = state.getProp("salesforce.bulk.testJobId", "");
                log.info("---Skip query, fetching result files directly for [jobId={}]", (Object)jobId);
                String batchIdListStr = state.getProp("salesforce.bulk.testBatchIds");
                SalesforceExtractor.ResultFileIdsStruct resultFileIdsStruct = salesforceExtractor.getQueryResultIdsPkChunkingFetchOnly(jobId, batchIdListStr);
                return resultFileIdsStruct;
            }
            log.info("---Pk Chunking query submit.");
            SalesforceExtractor.ResultFileIdsStruct resultFileIdsStruct = salesforceExtractor.getQueryResultIdsPkChunking(entity, predicateList);
            return resultFileIdsStruct;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            if (salesforceExtractor != null) {
                try {
                    salesforceExtractor.closeConnection();
                }
                catch (Exception e) {
                    log.error("Failed to close the extractor connections", (Throwable)e);
                }
            }
        }
    }

    private List<WorkUnit> createWorkUnits(QueryBasedSource.SourceEntity sourceEntity, SourceState state, SalesforceExtractor.ResultFileIdsStruct resultFileIdsStruct) {
        String nameSpaceName = state.getProp("extract.namespace");
        Extract.TableType tableType = Extract.TableType.valueOf((String)state.getProp("extract.table.type").toUpperCase());
        String outputTableName = sourceEntity.getDestTableName();
        Extract extract = this.createExtract(tableType, nameSpaceName, outputTableName);
        ArrayList workUnits = Lists.newArrayList();
        int partitionNumber = state.getPropAsInt("source.max.number.of.partitions", 1);
        List<SalesforceExtractor.BatchIdAndResultId> batchResultIds = resultFileIdsStruct.getBatchIdAndResultIdList();
        int total = batchResultIds.size();
        int sizeOfPartition = (total + partitionNumber - 1) / partitionNumber;
        List partitionedResultIds = Lists.partition(batchResultIds, (int)sizeOfPartition);
        log.info("----partition strategy: max-parti={}, size={}, actual-parti={}, total={}", new Object[]{partitionNumber, sizeOfPartition, partitionedResultIds.size(), total});
        for (List resultIds : partitionedResultIds) {
            WorkUnit workunit = new WorkUnit(extract);
            String bulkJobId = resultFileIdsStruct.getJobId();
            workunit.setProp("__salesforce.job.id", (Object)bulkJobId);
            String resultIdStr = resultIds.stream().map(x -> x.getBatchId() + ":" + x.getResultId()).collect(Collectors.joining(","));
            workunit.setProp("__salesforce.batch.result.id.pairs", (Object)resultIdStr);
            workunit.setProp("source.entity", (Object)sourceEntity.getSourceEntityName());
            workunit.setProp("extract.table.name", (Object)sourceEntity.getDestTableName());
            workunit.setProp("source.querybased.workUnitState.version", (Object)CURRENT_WORK_UNIT_STATE_VERSION);
            this.addLineageSourceInfo(state, sourceEntity, workunit);
            workUnits.add(workunit);
        }
        return workUnits;
    }

    private List<WorkUnit> generateWorkUnitsStrategy(QueryBasedSource.SourceEntity sourceEntity, SourceState state, long previousWatermark) {
        Histogram histogramAdjust;
        Boolean disableSoft = state.getPropAsBoolean("source.querybased.salesforce.is.soft.deletes.pull.disabled", false);
        log.info("disable soft delete pull: " + disableSoft);
        WatermarkType watermarkType = WatermarkType.valueOf((String)state.getProp("source.querybased.watermark.type", "timestamp").toUpperCase());
        String watermarkColumn = state.getProp("extract.delta.fields");
        int maxPartitions = state.getPropAsInt("source.max.number.of.partitions", 20);
        int minTargetPartitionSize = state.getPropAsInt(MIN_TARGET_PARTITION_SIZE, 250000);
        if (watermarkType == WatermarkType.SIMPLE || Strings.isNullOrEmpty((String)watermarkColumn) || !state.getPropAsBoolean(ENABLE_DYNAMIC_PARTITIONING) || maxPartitions <= 1) {
            List workUnits = super.generateWorkUnits(sourceEntity, state, previousWatermark);
            workUnits.stream().forEach(x -> x.setProp("source.querybased.salesforce.is.soft.deletes.pull.disabled", (Object)disableSoft));
            return workUnits;
        }
        Partitioner partitioner = new Partitioner(state);
        if (this.isEarlyStopEnabled((State)state) && partitioner.isFullDump()) {
            throw new UnsupportedOperationException("Early stop mode cannot work with full dump mode.");
        }
        Partition partition = partitioner.getGlobalPartition(previousWatermark);
        Histogram histogram = this.getHistogram(sourceEntity.getSourceEntityName(), watermarkColumn, state, partition);
        if (this.isEarlyStopEnabled((State)state)) {
            histogramAdjust = new Histogram();
            for (HistogramGroup group : histogram.getGroups()) {
                histogramAdjust.add(group);
                if (histogramAdjust.getTotalRecordCount() <= state.getPropAsLong(EARLY_STOP_TOTAL_RECORDS_LIMIT, 1000000L)) continue;
                break;
            }
        } else {
            histogramAdjust = histogram;
        }
        long expectedHighWatermark = partition.getHighWatermark();
        if (histogramAdjust.getGroups().size() < histogram.getGroups().size()) {
            HistogramGroup lastPlusOne = histogram.get(histogramAdjust.getGroups().size());
            long earlyStopHighWatermark = Long.parseLong(Utils.toDateTimeFormat((String)lastPlusOne.getKey(), (String)SECONDS_FORMAT, (String)"yyyyMMddHHmmss"));
            log.info("Job {} will be stopped earlier. [LW : {}, early-stop HW : {}, expected HW : {}]", new Object[]{state.getProp("job.name"), partition.getLowWatermark(), earlyStopHighWatermark, expectedHighWatermark});
            this.isEarlyStopped = true;
            expectedHighWatermark = earlyStopHighWatermark;
        } else {
            log.info("Job {} will be finished in a single run. [LW : {}, expected HW : {}]", new Object[]{state.getProp("job.name"), partition.getLowWatermark(), expectedHighWatermark});
        }
        String specifiedPartitions = this.generateSpecifiedPartitions(histogramAdjust, minTargetPartitionSize, maxPartitions, partition.getLowWatermark(), expectedHighWatermark);
        state.setProp("partitioner.hasUserSpecifiedPartitions", (Object)true);
        state.setProp("partitioner.userSpecifiedPartitions", (Object)specifiedPartitions);
        state.setProp("partitioner.isEarlyStopped", (Object)this.isEarlyStopped);
        List workUnits = super.generateWorkUnits(sourceEntity, state, previousWatermark);
        workUnits.stream().forEach(x -> x.setProp("source.querybased.salesforce.is.soft.deletes.pull.disabled", (Object)disableSoft));
        return workUnits;
    }

    private boolean isEarlyStopEnabled(State state) {
        return state.getPropAsBoolean("source.earlyStop.enabled", false);
    }

    String generateSpecifiedPartitions(Histogram histogram, int minTargetPartitionSize, int maxPartitions, long lowWatermark, long expectedHighWatermark) {
        int interval = this.computeTargetPartitionSize(histogram, minTargetPartitionSize, maxPartitions);
        int totalGroups = histogram.getGroups().size();
        log.info("Histogram total record count: " + histogram.totalRecordCount);
        log.info("Histogram total groups: " + totalGroups);
        log.info("maxPartitions: " + maxPartitions);
        log.info("interval: " + interval);
        List<HistogramGroup> groups = histogram.getGroups();
        ArrayList<String> partitionPoints = new ArrayList<String>();
        DescriptiveStatistics statistics = new DescriptiveStatistics();
        int count = 0;
        for (HistogramGroup group : groups) {
            if (count == 0) {
                partitionPoints.add(Utils.toDateTimeFormat((String)group.getKey(), (String)SECONDS_FORMAT, (String)"yyyyMMddHHmmss"));
            }
            if (count != 0 && count + group.count >= 2 * interval) {
                statistics.addValue((double)count);
                partitionPoints.add(Utils.toDateTimeFormat((String)group.getKey(), (String)SECONDS_FORMAT, (String)"yyyyMMddHHmmss"));
                count = group.count;
            } else {
                count += group.count;
            }
            if (count < interval) continue;
            statistics.addValue((double)count);
            count = 0;
        }
        if (partitionPoints.isEmpty()) {
            throw new RuntimeException("Unexpected empty partition list");
        }
        if (count > 0) {
            statistics.addValue((double)count);
        }
        partitionPoints.add(Long.toString(expectedHighWatermark));
        log.info("Dynamic partitioning statistics: ");
        log.info("data: " + Arrays.toString(statistics.getValues()));
        log.info(statistics.toString());
        String specifiedPartitions = Joiner.on((String)",").join(partitionPoints);
        log.info("Calculated specified partitions: " + specifiedPartitions);
        return specifiedPartitions;
    }

    private int computeTargetPartitionSize(Histogram histogram, int minTargetPartitionSize, int maxPartitions) {
        return Math.max(minTargetPartitionSize, DoubleMath.roundToInt((double)((double)histogram.totalRecordCount / (double)maxPartitions), (RoundingMode)RoundingMode.CEILING));
    }

    private JsonArray getRecordsForQuery(SalesforceConnector connector, String query) {
        RestApiProcessingException exception = null;
        for (int i = 0; i < this.workUnitConf.restApiRetryLimit + 1; ++i) {
            try {
                String soqlQuery = SalesforceExtractor.getSoqlUrl(query);
                List commands = RestApiConnector.constructGetCommand((String)connector.getFullUri(soqlQuery));
                CommandOutput response = connector.getResponse(commands);
                Iterator itr = response.getResults().values().iterator();
                if (!itr.hasNext()) {
                    throw new DataRecordException("Failed to get data from salesforce; REST response has no output");
                }
                String output = (String)itr.next();
                return ((JsonObject)GSON.fromJson(output, JsonObject.class)).getAsJsonArray("records");
            }
            catch (DataRecordException | RestApiClientException e) {
                throw new RuntimeException("Fail to get data from salesforce", e);
            }
            catch (RestApiProcessingException e) {
                exception = e;
                log.info("Caught RestApiProcessingException, retrying({}) rest query: {}", (Object)(i + 1), (Object)query);
                Thread.sleep(this.workUnitConf.restApiRetryInterval);
                continue;
            }
        }
        throw new RuntimeException("Fail to get data from salesforce", exception);
    }

    private int getCountForRange(TableCountProbingContext probingContext, StrSubstitutor sub, Map<String, String> subValues, long startTime, long endTime) {
        String startTimeStr = Utils.dateToString((Date)new Date(startTime), (String)"yyyy-MM-dd'T'HH:mm:ss'.000Z'");
        String endTimeStr = Utils.dateToString((Date)new Date(endTime), (String)"yyyy-MM-dd'T'HH:mm:ss'.000Z'");
        subValues.put("start", startTimeStr);
        subValues.put("end", endTimeStr);
        String query = sub.replace(PROBE_PARTITION_QUERY_TEMPLATE);
        log.debug("Count query: " + query);
        probingContext.probeCount++;
        JsonArray records = this.getRecordsForQuery(probingContext.connector, query);
        Iterator elements = records.iterator();
        JsonObject element = ((JsonElement)elements.next()).getAsJsonObject();
        return element.get("cnt").getAsInt();
    }

    private void getHistogramRecursively(TableCountProbingContext probingContext, Histogram histogram, StrSubstitutor sub, Map<String, String> values, int count, long startEpoch, long endEpoch) {
        long midpointEpoch = startEpoch + (endEpoch - startEpoch) / 2L;
        if (count <= probingContext.bucketSizeLimit || probingContext.probeCount > probingContext.probeLimit || midpointEpoch - startEpoch < 1000L) {
            histogram.add(new HistogramGroup(Utils.epochToDate((long)startEpoch, (String)SECONDS_FORMAT), count));
            return;
        }
        int countLeft = this.getCountForRange(probingContext, sub, values, startEpoch, midpointEpoch);
        this.getHistogramRecursively(probingContext, histogram, sub, values, countLeft, startEpoch, midpointEpoch);
        log.debug("Count {} for left partition {} to {}", new Object[]{countLeft, startEpoch, midpointEpoch});
        int countRight = count - countLeft;
        this.getHistogramRecursively(probingContext, histogram, sub, values, countRight, midpointEpoch, endEpoch);
        log.debug("Count {} for right partition {} to {}", new Object[]{countRight, midpointEpoch, endEpoch});
    }

    private Histogram getHistogramByProbing(TableCountProbingContext probingContext, int count, long startEpoch, long endEpoch) {
        Histogram histogram = new Histogram();
        HashMap<String, String> values = new HashMap<String, String>();
        values.put("table", probingContext.entity);
        values.put("column", probingContext.watermarkColumn);
        values.put("greater", ">=");
        values.put("less", "<");
        StrSubstitutor sub = new StrSubstitutor(values);
        this.getHistogramRecursively(probingContext, histogram, sub, values, count, startEpoch, endEpoch);
        return histogram;
    }

    private Histogram getRefinedHistogram(SalesforceConnector connector, String entity, String watermarkColumn, SourceState state, Partition partition, Histogram histogram) {
        int maxPartitions = state.getPropAsInt("source.max.number.of.partitions", 20);
        int probeLimit = state.getPropAsInt(DYNAMIC_PROBING_LIMIT, 1000);
        int minTargetPartitionSize = state.getPropAsInt(MIN_TARGET_PARTITION_SIZE, 250000);
        Histogram outputHistogram = new Histogram();
        double probeTargetRatio = state.getPropAsDouble(PROBE_TARGET_RATIO, 0.6);
        int bucketSizeLimit = (int)(probeTargetRatio * (double)this.computeTargetPartitionSize(histogram, minTargetPartitionSize, maxPartitions));
        log.info("Refining histogram with bucket size limit {}.", (Object)bucketSizeLimit);
        TableCountProbingContext probingContext = new TableCountProbingContext(connector, entity, watermarkColumn, bucketSizeLimit, probeLimit);
        if (histogram.getGroups().isEmpty()) {
            return outputHistogram;
        }
        ArrayList<HistogramGroup> list = new ArrayList<HistogramGroup>(histogram.getGroups());
        Date hwmDate = Utils.toDate((long)partition.getHighWatermark(), (String)"yyyyMMddHHmmss");
        list.add(new HistogramGroup(Utils.epochToDate((long)hwmDate.getTime(), (String)SECONDS_FORMAT), 0));
        for (int i = 0; i < list.size() - 1; ++i) {
            HistogramGroup currentGroup = (HistogramGroup)list.get(i);
            HistogramGroup nextGroup = (HistogramGroup)list.get(i + 1);
            if (currentGroup.count > bucketSizeLimit) {
                long startEpoch = Utils.toDate((String)currentGroup.getKey(), (String)SECONDS_FORMAT).getTime();
                long endEpoch = Utils.toDate((String)nextGroup.getKey(), (String)SECONDS_FORMAT).getTime();
                outputHistogram.add(this.getHistogramByProbing(probingContext, currentGroup.count, startEpoch, endEpoch));
                continue;
            }
            outputHistogram.add(currentGroup);
        }
        log.info("Executed {} probes for refining the histogram.", (Object)probingContext.probeCount);
        if (probingContext.probeCount >= probingContext.probeLimit) {
            log.warn("Reached the probe limit");
        }
        return outputHistogram;
    }

    private Histogram getHistogramByDayBucketing(SalesforceConnector connector, String entity, String watermarkColumn, Partition partition) {
        Histogram histogram = new Histogram();
        GregorianCalendar calendar = new GregorianCalendar();
        Date startDate = Utils.toDate((long)partition.getLowWatermark(), (String)"yyyyMMddHHmmss");
        calendar.setTime(startDate);
        int startYear = calendar.get(1);
        String lowWatermarkDate = Utils.dateToString((Date)startDate, (String)"yyyy-MM-dd'T'HH:mm:ss'.000Z'");
        Date endDate = Utils.toDate((long)partition.getHighWatermark(), (String)"yyyyMMddHHmmss");
        calendar.setTime(endDate);
        int endYear = calendar.get(1);
        String highWatermarkDate = Utils.dateToString((Date)endDate, (String)"yyyy-MM-dd'T'HH:mm:ss'.000Z'");
        HashMap<String, String> values = new HashMap<String, String>();
        values.put("table", entity);
        values.put("column", watermarkColumn);
        StrSubstitutor sub = new StrSubstitutor(values);
        for (int year = startYear; year <= endYear; ++year) {
            if (year == startYear) {
                values.put("start", lowWatermarkDate);
                values.put("greater", partition.isLowWatermarkInclusive() ? ">=" : ">");
            } else {
                values.put("start", this.getDateString(year));
                values.put("greater", ">=");
            }
            if (year == endYear) {
                values.put("end", highWatermarkDate);
                values.put("less", partition.isHighWatermarkInclusive() ? "<=" : "<");
            } else {
                values.put("end", this.getDateString(year + 1));
                values.put("less", "<");
            }
            String query = sub.replace(DAY_PARTITION_QUERY_TEMPLATE);
            log.info("Histogram query: " + query);
            histogram.add(this.parseDayBucketingHistogram(this.getRecordsForQuery(connector, query)));
        }
        return histogram;
    }

    protected SalesforceConnector getConnector(State state) {
        if (this.salesforceConnector == null) {
            this.salesforceConnector = new SalesforceConnector(state);
        }
        return this.salesforceConnector;
    }

    private Histogram getHistogram(String entity, String watermarkColumn, SourceState state, Partition partition) {
        SalesforceConnector connector = this.getConnector((State)state);
        try {
            if (!connector.connect()) {
                throw new RuntimeException("Failed to connect.");
            }
        }
        catch (RestApiConnectionException e) {
            throw new RuntimeException("Failed to connect.", e);
        }
        Histogram histogram = this.getHistogramByDayBucketing(connector, entity, watermarkColumn, partition);
        HistogramGroup firstGroup = histogram.get(0);
        Date lwmDate = Utils.toDate((long)partition.getLowWatermark(), (String)"yyyyMMddHHmmss");
        histogram.getGroups().set(0, new HistogramGroup(Utils.epochToDate((long)lwmDate.getTime(), (String)SECONDS_FORMAT), firstGroup.getCount()));
        if (state.getPropAsBoolean(ENABLE_DYNAMIC_PROBING)) {
            histogram = this.getRefinedHistogram(connector, entity, watermarkColumn, state, partition, histogram);
        }
        return histogram;
    }

    private String getDateString(int year) {
        GregorianCalendar calendar = new GregorianCalendar();
        calendar.clear();
        calendar.set(1, year);
        return Utils.dateToString((Date)calendar.getTime(), (String)"yyyy-MM-dd'T'HH:mm:ss'.000Z'");
    }

    private Histogram parseDayBucketingHistogram(JsonArray records) {
        log.info("Parse day-based histogram");
        Histogram histogram = new Histogram();
        Iterator elements = records.iterator();
        while (elements.hasNext()) {
            JsonObject element = ((JsonElement)elements.next()).getAsJsonObject();
            String time = element.get("time").getAsString() + ZERO_TIME_SUFFIX;
            int count = element.get("cnt").getAsInt();
            histogram.add(new HistogramGroup(time, count));
        }
        return histogram;
    }

    protected Set<QueryBasedSource.SourceEntity> getSourceEntities(State state) {
        if (!state.getPropAsBoolean(USE_ALL_OBJECTS, false)) {
            return super.getSourceEntities(state);
        }
        SalesforceConnector connector = this.getConnector(state);
        try {
            if (!connector.connect()) {
                throw new RuntimeException("Failed to connect.");
            }
        }
        catch (RestApiConnectionException e) {
            throw new RuntimeException("Failed to connect.", e);
        }
        List commands = RestApiConnector.constructGetCommand((String)connector.getFullUri("/sobjects"));
        try {
            CommandOutput response = connector.getResponse(commands);
            Iterator itr = response.getResults().values().iterator();
            if (itr.hasNext()) {
                String next = (String)itr.next();
                return SalesforceSource.getSourceEntities(next);
            }
            throw new RuntimeException("Unable to retrieve source entities");
        }
        catch (RestApiProcessingException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    private static Set<QueryBasedSource.SourceEntity> getSourceEntities(String response) {
        HashSet result = Sets.newHashSet();
        JsonObject jsonObject = ((JsonObject)new Gson().fromJson(response, JsonObject.class)).getAsJsonObject();
        JsonArray array = jsonObject.getAsJsonArray("sobjects");
        for (JsonElement element : array) {
            String sourceEntityName = element.getAsJsonObject().get("name").getAsString();
            result.add(QueryBasedSource.SourceEntity.fromSourceEntityName((String)sourceEntityName));
        }
        return result;
    }

    private static class TableCountProbingContext {
        private final SalesforceConnector connector;
        private final String entity;
        private final String watermarkColumn;
        private final int bucketSizeLimit;
        private final int probeLimit;
        private int probeCount = 0;

        public TableCountProbingContext(SalesforceConnector connector, String entity, String watermarkColumn, int bucketSizeLimit, int probeLimit) {
            this.connector = connector;
            this.entity = entity;
            this.watermarkColumn = watermarkColumn;
            this.bucketSizeLimit = bucketSizeLimit;
            this.probeLimit = probeLimit;
        }
    }

    static class Histogram {
        private long totalRecordCount = 0L;
        private List<HistogramGroup> groups = new ArrayList<HistogramGroup>();

        Histogram() {
        }

        void add(HistogramGroup group) {
            this.groups.add(group);
            this.totalRecordCount += (long)group.count;
        }

        void add(Histogram histogram) {
            this.groups.addAll(histogram.getGroups());
            this.totalRecordCount += histogram.totalRecordCount;
        }

        HistogramGroup get(int idx) {
            return this.groups.get(idx);
        }

        public String toString() {
            return this.groups.toString();
        }

        public long getTotalRecordCount() {
            return this.totalRecordCount;
        }

        public List<HistogramGroup> getGroups() {
            return this.groups;
        }
    }

    static class HistogramGroup {
        private final String key;
        private final int count;

        public String toString() {
            return this.key + ":" + this.count;
        }

        public HistogramGroup(String key, int count) {
            this.key = key;
            this.count = count;
        }

        public String getKey() {
            return this.key;
        }

        public int getCount() {
            return this.count;
        }
    }
}

