/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.salesforce;

import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.common.math.DoubleMath;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.text.StrSubstitutor;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.salesforce.SalesforceConnector;
import org.apache.gobblin.salesforce.SalesforceExtractor;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.exception.ExtractPrepareException;
import org.apache.gobblin.source.extractor.exception.RestApiClientException;
import org.apache.gobblin.source.extractor.exception.RestApiConnectionException;
import org.apache.gobblin.source.extractor.exception.RestApiProcessingException;
import org.apache.gobblin.source.extractor.extract.CommandOutput;
import org.apache.gobblin.source.extractor.extract.QueryBasedSource;
import org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector;
import org.apache.gobblin.source.extractor.partition.Partition;
import org.apache.gobblin.source.extractor.partition.Partitioner;
import org.apache.gobblin.source.extractor.utils.Utils;
import org.apache.gobblin.source.extractor.watermark.WatermarkType;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SalesforceSource
extends QueryBasedSource<JsonArray, JsonElement> {
    private static final Logger log = LoggerFactory.getLogger(SalesforceSource.class);
    public static final String USE_ALL_OBJECTS = "use.all.objects";
    public static final boolean DEFAULT_USE_ALL_OBJECTS = false;
    private static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning";
    private static final String ENABLE_DYNAMIC_PROBING = "salesforce.enableDynamicProbing";
    private static final String DYNAMIC_PROBING_LIMIT = "salesforce.dynamicProbingLimit";
    private static final int DEFAULT_DYNAMIC_PROBING_LIMIT = 1000;
    private static final String MIN_TARGET_PARTITION_SIZE = "salesforce.minTargetPartitionSize";
    private static final int DEFAULT_MIN_TARGET_PARTITION_SIZE = 250000;
    private static final String PROBE_TARGET_RATIO = "salesforce.probeTargetRatio";
    private static final double DEFAULT_PROBE_TARGET_RATIO = 0.6;
    private static final int MIN_SPLIT_TIME_MILLIS = 1000;
    private static final String DAY_PARTITION_QUERY_TEMPLATE = "SELECT count(${column}) cnt, DAY_ONLY(${column}) time FROM ${table} WHERE ${column} ${greater} ${start} AND ${column} ${less} ${end} GROUP BY DAY_ONLY(${column}) ORDER BY DAY_ONLY(${column})";
    private static final String PROBE_PARTITION_QUERY_TEMPLATE = "SELECT count(${column}) cnt FROM ${table} WHERE ${column} ${greater} ${start} AND ${column} ${less} ${end}";
    private static final String SECONDS_FORMAT = "yyyy-MM-dd-HH:mm:ss";
    private static final String ZERO_TIME_SUFFIX = "-00:00:00";
    private static final Gson GSON = new Gson();

    public Extractor<JsonArray, JsonElement> getExtractor(WorkUnitState state) throws IOException {
        try {
            return new SalesforceExtractor(state).build();
        }
        catch (ExtractPrepareException e) {
            log.error("Failed to prepare extractor", (Throwable)e);
            throw new IOException(e);
        }
    }

    protected List<WorkUnit> generateWorkUnits(QueryBasedSource.SourceEntity sourceEntity, SourceState state, long previousWatermark) {
        WatermarkType watermarkType = WatermarkType.valueOf((String)state.getProp("source.querybased.watermark.type", "timestamp").toUpperCase());
        String watermarkColumn = state.getProp("extract.delta.fields");
        int maxPartitions = state.getPropAsInt("source.max.number.of.partitions", 20);
        int minTargetPartitionSize = state.getPropAsInt(MIN_TARGET_PARTITION_SIZE, 250000);
        if (watermarkType == WatermarkType.SIMPLE || Strings.isNullOrEmpty((String)watermarkColumn) || !state.getPropAsBoolean(ENABLE_DYNAMIC_PARTITIONING) || maxPartitions <= 1) {
            return super.generateWorkUnits(sourceEntity, state, previousWatermark);
        }
        Partition partition = new Partitioner(state).getGlobalPartition(previousWatermark);
        Histogram histogram = this.getHistogram(sourceEntity.getSourceEntityName(), watermarkColumn, state, partition);
        String specifiedPartitions = this.generateSpecifiedPartitions(histogram, minTargetPartitionSize, maxPartitions, partition.getLowWatermark(), partition.getHighWatermark());
        state.setProp("partitioner.hasUserSpecifiedPartitions", (Object)true);
        state.setProp("partitioner.userSpecifiedPartitions", (Object)specifiedPartitions);
        return super.generateWorkUnits(sourceEntity, state, previousWatermark);
    }

    String generateSpecifiedPartitions(Histogram histogram, int minTargetPartitionSize, int maxPartitions, long lowWatermark, long expectedHighWatermark) {
        int interval = this.computeTargetPartitionSize(histogram, minTargetPartitionSize, maxPartitions);
        int totalGroups = histogram.getGroups().size();
        log.info("Histogram total record count: " + histogram.totalRecordCount);
        log.info("Histogram total groups: " + totalGroups);
        log.info("maxPartitions: " + maxPartitions);
        log.info("interval: " + interval);
        List<HistogramGroup> groups = histogram.getGroups();
        ArrayList<String> partitionPoints = new ArrayList<String>();
        DescriptiveStatistics statistics = new DescriptiveStatistics();
        int count = 0;
        for (HistogramGroup group : groups) {
            if (count == 0) {
                partitionPoints.add(Utils.toDateTimeFormat((String)group.getKey(), (String)SECONDS_FORMAT, (String)"yyyyMMddHHmmss"));
            }
            if (count != 0 && count + group.count >= 2 * interval) {
                statistics.addValue((double)count);
                partitionPoints.add(Utils.toDateTimeFormat((String)group.getKey(), (String)SECONDS_FORMAT, (String)"yyyyMMddHHmmss"));
                count = group.count;
            } else {
                count += group.count;
            }
            if (count < interval) continue;
            statistics.addValue((double)count);
            count = 0;
        }
        if (partitionPoints.isEmpty()) {
            throw new RuntimeException("Unexpected empty partition list");
        }
        if (count > 0) {
            statistics.addValue((double)count);
        }
        partitionPoints.add(Long.toString(expectedHighWatermark));
        log.info("Dynamic partitioning statistics: ");
        log.info("data: " + Arrays.toString(statistics.getValues()));
        log.info(statistics.toString());
        String specifiedPartitions = Joiner.on((String)",").join(partitionPoints);
        log.info("Calculated specified partitions: " + specifiedPartitions);
        return specifiedPartitions;
    }

    private int computeTargetPartitionSize(Histogram histogram, int minTargetPartitionSize, int maxPartitions) {
        return Math.max(minTargetPartitionSize, DoubleMath.roundToInt((double)((double)histogram.totalRecordCount / (double)maxPartitions), (RoundingMode)RoundingMode.CEILING));
    }

    private JsonArray getRecordsForQuery(SalesforceConnector connector, String query) {
        try {
            String soqlQuery = SalesforceExtractor.getSoqlUrl(query);
            List commands = RestApiConnector.constructGetCommand((String)connector.getFullUri(soqlQuery));
            CommandOutput response = connector.getResponse(commands);
            Iterator itr = response.getResults().values().iterator();
            if (!itr.hasNext()) {
                throw new DataRecordException("Failed to get data from salesforce; REST response has no output");
            }
            String output = (String)itr.next();
            return ((JsonObject)GSON.fromJson(output, JsonObject.class)).getAsJsonArray("records");
        }
        catch (DataRecordException | RestApiClientException | RestApiProcessingException e) {
            throw new RuntimeException("Fail to get data from salesforce", e);
        }
    }

    private int getCountForRange(TableCountProbingContext probingContext, StrSubstitutor sub, Map<String, String> subValues, long startTime, long endTime) {
        String startTimeStr = Utils.dateToString((Date)new Date(startTime), (String)"yyyy-MM-dd'T'HH:mm:ss'.000Z'");
        String endTimeStr = Utils.dateToString((Date)new Date(endTime), (String)"yyyy-MM-dd'T'HH:mm:ss'.000Z'");
        subValues.put("start", startTimeStr);
        subValues.put("end", endTimeStr);
        String query = sub.replace(PROBE_PARTITION_QUERY_TEMPLATE);
        log.debug("Count query: " + query);
        probingContext.probeCount++;
        JsonArray records = this.getRecordsForQuery(probingContext.connector, query);
        Iterator elements = records.iterator();
        JsonObject element = ((JsonElement)elements.next()).getAsJsonObject();
        return element.get("cnt").getAsInt();
    }

    private void getHistogramRecursively(TableCountProbingContext probingContext, Histogram histogram, StrSubstitutor sub, Map<String, String> values, int count, long startEpoch, long endEpoch) {
        long midpointEpoch = startEpoch + (endEpoch - startEpoch) / 2L;
        if (count <= probingContext.bucketSizeLimit || probingContext.probeCount > probingContext.probeLimit || midpointEpoch - startEpoch < 1000L) {
            histogram.add(new HistogramGroup(Utils.epochToDate((long)startEpoch, (String)SECONDS_FORMAT), count));
            return;
        }
        int countLeft = this.getCountForRange(probingContext, sub, values, startEpoch, midpointEpoch);
        this.getHistogramRecursively(probingContext, histogram, sub, values, countLeft, startEpoch, midpointEpoch);
        log.debug("Count {} for left partition {} to {}", new Object[]{countLeft, startEpoch, midpointEpoch});
        int countRight = count - countLeft;
        this.getHistogramRecursively(probingContext, histogram, sub, values, countRight, midpointEpoch, endEpoch);
        log.debug("Count {} for right partition {} to {}", new Object[]{countRight, midpointEpoch, endEpoch});
    }

    private Histogram getHistogramByProbing(TableCountProbingContext probingContext, int count, long startEpoch, long endEpoch) {
        Histogram histogram = new Histogram();
        HashMap<String, String> values = new HashMap<String, String>();
        values.put("table", probingContext.entity);
        values.put("column", probingContext.watermarkColumn);
        values.put("greater", ">=");
        values.put("less", "<");
        StrSubstitutor sub = new StrSubstitutor(values);
        this.getHistogramRecursively(probingContext, histogram, sub, values, count, startEpoch, endEpoch);
        return histogram;
    }

    private Histogram getRefinedHistogram(SalesforceConnector connector, String entity, String watermarkColumn, SourceState state, Partition partition, Histogram histogram) {
        int maxPartitions = state.getPropAsInt("source.max.number.of.partitions", 20);
        int probeLimit = state.getPropAsInt(DYNAMIC_PROBING_LIMIT, 1000);
        int minTargetPartitionSize = state.getPropAsInt(MIN_TARGET_PARTITION_SIZE, 250000);
        Histogram outputHistogram = new Histogram();
        double probeTargetRatio = state.getPropAsDouble(PROBE_TARGET_RATIO, 0.6);
        int bucketSizeLimit = (int)(probeTargetRatio * (double)this.computeTargetPartitionSize(histogram, minTargetPartitionSize, maxPartitions));
        log.info("Refining histogram with bucket size limit {}.", (Object)bucketSizeLimit);
        TableCountProbingContext probingContext = new TableCountProbingContext(connector, entity, watermarkColumn, bucketSizeLimit, probeLimit);
        if (histogram.getGroups().isEmpty()) {
            return outputHistogram;
        }
        ArrayList<HistogramGroup> list = new ArrayList<HistogramGroup>(histogram.getGroups());
        Date hwmDate = Utils.toDate((long)partition.getLowWatermark(), (String)"yyyyMMddHHmmss");
        list.add(new HistogramGroup(Utils.epochToDate((long)hwmDate.getTime(), (String)SECONDS_FORMAT), 0));
        for (int i = 0; i < list.size() - 1; ++i) {
            HistogramGroup currentGroup = (HistogramGroup)list.get(i);
            HistogramGroup nextGroup = (HistogramGroup)list.get(i + 1);
            if (currentGroup.count > bucketSizeLimit) {
                long startEpoch = Utils.toDate((String)currentGroup.getKey(), (String)SECONDS_FORMAT).getTime();
                long endEpoch = Utils.toDate((String)nextGroup.getKey(), (String)SECONDS_FORMAT).getTime();
                outputHistogram.add(this.getHistogramByProbing(probingContext, currentGroup.count, startEpoch, endEpoch));
                continue;
            }
            outputHistogram.add(currentGroup);
        }
        log.info("Executed {} probes for refining the histogram.", (Object)probingContext.probeCount);
        if (probingContext.probeCount >= probingContext.probeLimit) {
            log.warn("Reached the probe limit");
        }
        return outputHistogram;
    }

    private Histogram getHistogramByDayBucketing(SalesforceConnector connector, String entity, String watermarkColumn, Partition partition) {
        Histogram histogram = new Histogram();
        GregorianCalendar calendar = new GregorianCalendar();
        Date startDate = Utils.toDate((long)partition.getLowWatermark(), (String)"yyyyMMddHHmmss");
        calendar.setTime(startDate);
        int startYear = calendar.get(1);
        String lowWatermarkDate = Utils.dateToString((Date)startDate, (String)"yyyy-MM-dd'T'HH:mm:ss'.000Z'");
        Date endDate = Utils.toDate((long)partition.getHighWatermark(), (String)"yyyyMMddHHmmss");
        calendar.setTime(endDate);
        int endYear = calendar.get(1);
        String highWatermarkDate = Utils.dateToString((Date)endDate, (String)"yyyy-MM-dd'T'HH:mm:ss'.000Z'");
        HashMap<String, String> values = new HashMap<String, String>();
        values.put("table", entity);
        values.put("column", watermarkColumn);
        StrSubstitutor sub = new StrSubstitutor(values);
        for (int year = startYear; year <= endYear; ++year) {
            if (year == startYear) {
                values.put("start", lowWatermarkDate);
                values.put("greater", partition.isLowWatermarkInclusive() ? ">=" : ">");
            } else {
                values.put("start", this.getDateString(year));
                values.put("greater", ">=");
            }
            if (year == endYear) {
                values.put("end", highWatermarkDate);
                values.put("less", partition.isHighWatermarkInclusive() ? "<=" : "<");
            } else {
                values.put("end", this.getDateString(year + 1));
                values.put("less", "<");
            }
            String query = sub.replace(DAY_PARTITION_QUERY_TEMPLATE);
            log.info("Histogram query: " + query);
            histogram.add(this.parseDayBucketingHistogram(this.getRecordsForQuery(connector, query)));
        }
        return histogram;
    }

    private Histogram getHistogram(String entity, String watermarkColumn, SourceState state, Partition partition) {
        SalesforceConnector connector = new SalesforceConnector((State)state);
        try {
            if (!connector.connect()) {
                throw new RuntimeException("Failed to connect.");
            }
        }
        catch (RestApiConnectionException e) {
            throw new RuntimeException("Failed to connect.", e);
        }
        Histogram histogram = this.getHistogramByDayBucketing(connector, entity, watermarkColumn, partition);
        HistogramGroup firstGroup = histogram.getGroups().get(0);
        Date lwmDate = Utils.toDate((long)partition.getLowWatermark(), (String)"yyyyMMddHHmmss");
        histogram.getGroups().set(0, new HistogramGroup(Utils.epochToDate((long)lwmDate.getTime(), (String)SECONDS_FORMAT), firstGroup.getCount()));
        if (state.getPropAsBoolean(ENABLE_DYNAMIC_PROBING)) {
            histogram = this.getRefinedHistogram(connector, entity, watermarkColumn, state, partition, histogram);
        }
        return histogram;
    }

    private String getDateString(int year) {
        GregorianCalendar calendar = new GregorianCalendar();
        calendar.clear();
        calendar.set(1, year);
        return Utils.dateToString((Date)calendar.getTime(), (String)"yyyy-MM-dd'T'HH:mm:ss'.000Z'");
    }

    private Histogram parseDayBucketingHistogram(JsonArray records) {
        log.info("Parse day-based histogram");
        Histogram histogram = new Histogram();
        Iterator elements = records.iterator();
        while (elements.hasNext()) {
            JsonObject element = ((JsonElement)elements.next()).getAsJsonObject();
            String time = element.get("time").getAsString() + ZERO_TIME_SUFFIX;
            int count = element.get("cnt").getAsInt();
            histogram.add(new HistogramGroup(time, count));
        }
        return histogram;
    }

    protected Set<QueryBasedSource.SourceEntity> getSourceEntities(State state) {
        if (!state.getPropAsBoolean(USE_ALL_OBJECTS, false)) {
            return super.getSourceEntities(state);
        }
        SalesforceConnector connector = new SalesforceConnector(state);
        try {
            if (!connector.connect()) {
                throw new RuntimeException("Failed to connect.");
            }
        }
        catch (RestApiConnectionException e) {
            throw new RuntimeException("Failed to connect.", e);
        }
        List commands = RestApiConnector.constructGetCommand((String)connector.getFullUri("/sobjects"));
        try {
            CommandOutput response = connector.getResponse(commands);
            Iterator itr = response.getResults().values().iterator();
            if (itr.hasNext()) {
                String next = (String)itr.next();
                return SalesforceSource.getSourceEntities(next);
            }
            throw new RuntimeException("Unable to retrieve source entities");
        }
        catch (RestApiProcessingException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    private static Set<QueryBasedSource.SourceEntity> getSourceEntities(String response) {
        HashSet result = Sets.newHashSet();
        JsonObject jsonObject = ((JsonObject)new Gson().fromJson(response, JsonObject.class)).getAsJsonObject();
        JsonArray array = jsonObject.getAsJsonArray("sobjects");
        for (JsonElement element : array) {
            String sourceEntityName = element.getAsJsonObject().get("name").getAsString();
            result.add(QueryBasedSource.SourceEntity.fromSourceEntityName((String)sourceEntityName));
        }
        return result;
    }

    private static class TableCountProbingContext {
        private final SalesforceConnector connector;
        private final String entity;
        private final String watermarkColumn;
        private final int bucketSizeLimit;
        private final int probeLimit;
        private int probeCount = 0;

        @ConstructorProperties(value={"connector", "entity", "watermarkColumn", "bucketSizeLimit", "probeLimit"})
        public TableCountProbingContext(SalesforceConnector connector, String entity, String watermarkColumn, int bucketSizeLimit, int probeLimit) {
            this.connector = connector;
            this.entity = entity;
            this.watermarkColumn = watermarkColumn;
            this.bucketSizeLimit = bucketSizeLimit;
            this.probeLimit = probeLimit;
        }
    }

    static class Histogram {
        private long totalRecordCount = 0L;
        private List<HistogramGroup> groups = new ArrayList<HistogramGroup>();

        Histogram() {
        }

        void add(HistogramGroup group) {
            this.groups.add(group);
            this.totalRecordCount += (long)group.count;
        }

        void add(Histogram histogram) {
            this.groups.addAll(histogram.getGroups());
            this.totalRecordCount += histogram.totalRecordCount;
        }

        public String toString() {
            return this.groups.toString();
        }

        public long getTotalRecordCount() {
            return this.totalRecordCount;
        }

        public List<HistogramGroup> getGroups() {
            return this.groups;
        }
    }

    static class HistogramGroup {
        private final String key;
        private final int count;

        public String toString() {
            return this.key + ":" + this.count;
        }

        @ConstructorProperties(value={"key", "count"})
        public HistogramGroup(String key, int count) {
            this.key = key;
            this.count = count;
        }

        public String getKey() {
            return this.key;
        }

        public int getCount() {
            return this.count;
        }
    }
}

