/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.salesforce;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.sforce.async.AsyncApiException;
import com.sforce.async.BatchInfo;
import com.sforce.async.BatchInfoList;
import com.sforce.async.BatchStateEnum;
import com.sforce.async.BulkConnection;
import com.sforce.async.ConcurrencyMode;
import com.sforce.async.ContentType;
import com.sforce.async.JobInfo;
import com.sforce.async.OperationEnum;
import com.sforce.async.QueryResultList;
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.ws.ConnectorConfig;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.net.URI;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.password.PasswordManager;
import org.apache.gobblin.salesforce.FileIdVO;
import org.apache.gobblin.salesforce.QueryResultIterator;
import org.apache.gobblin.salesforce.ResultChainingIterator;
import org.apache.gobblin.salesforce.SalesforceConnector;
import org.apache.gobblin.salesforce.SfConfig;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.exception.HighWatermarkException;
import org.apache.gobblin.source.extractor.exception.RecordCountException;
import org.apache.gobblin.source.extractor.exception.RestApiClientException;
import org.apache.gobblin.source.extractor.exception.RestApiConnectionException;
import org.apache.gobblin.source.extractor.exception.SchemaException;
import org.apache.gobblin.source.extractor.extract.Command;
import org.apache.gobblin.source.extractor.extract.CommandOutput;
import org.apache.gobblin.source.extractor.extract.CommandType;
import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
import org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector;
import org.apache.gobblin.source.extractor.extract.restapi.RestApiExtractor;
import org.apache.gobblin.source.extractor.schema.Schema;
import org.apache.gobblin.source.extractor.utils.Utils;
import org.apache.gobblin.source.extractor.watermark.Predicate;
import org.apache.gobblin.source.extractor.watermark.WatermarkType;
import org.apache.gobblin.source.jdbc.SqlQueryUtils;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SalesforceExtractor
extends RestApiExtractor {
    private static final Logger log = LoggerFactory.getLogger(SalesforceExtractor.class);
    private static final String SOQL_RESOURCE = "/queryAll";
    public static final String SALESFORCE_TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'.000Z'";
    private static final String SALESFORCE_DATE_FORMAT = "yyyy-MM-dd";
    private static final String SALESFORCE_HOUR_FORMAT = "HH";
    private static final String SALESFORCE_SOAP_SERVICE = "/services/Soap/u";
    private static final Gson GSON = new Gson();
    private static final int MAX_RETRY_INTERVAL_SECS = 600;
    private boolean pullStatus = true;
    private String nextUrl;
    private BulkConnection bulkConnection = null;
    private JobInfo bulkJob = new JobInfo();
    private List<BatchIdAndResultId> bulkResultIdList;
    private boolean bulkJobFinished = true;
    private boolean newBulkResultSet = true;
    private final int pkChunkingSize;
    private final SalesforceConnector sfConnector;
    private final int retryLimit;
    private final long retryInterval;
    private final long retryExceedQuotaInterval;
    private final boolean bulkApiUseQueryAll;
    private SfConfig conf;
    private Boolean isPkChunkingFetchDone = false;
    private Boolean isBulkFetchDone = false;

    public SalesforceExtractor(WorkUnitState state) {
        super(state);
        this.conf = new SfConfig(state.getProperties());
        this.sfConnector = (SalesforceConnector)this.connector;
        this.pkChunkingSize = this.conf.pkChunkingSize;
        this.retryInterval = this.conf.retryInterval;
        this.retryExceedQuotaInterval = this.conf.retryExceedQuotaInterval;
        this.bulkApiUseQueryAll = this.conf.bulkApiUseQueryAll;
        this.retryLimit = this.conf.fetchRetryLimit;
    }

    protected RestApiConnector getConnector(WorkUnitState state) {
        return new SalesforceConnector((State)state);
    }

    private void setPullStatus(boolean pullStatus) {
        this.pullStatus = pullStatus;
    }

    private void setNextUrl(String nextUrl) {
        this.nextUrl = nextUrl;
    }

    private boolean isBulkJobFinished() {
        return this.bulkJobFinished;
    }

    private void setBulkJobFinished(boolean bulkJobFinished) {
        this.bulkJobFinished = bulkJobFinished;
    }

    private boolean isNewBulkResultSet() {
        return this.newBulkResultSet;
    }

    private void setNewBulkResultSet(boolean newBulkResultSet) {
        this.newBulkResultSet = newBulkResultSet;
    }

    public HttpEntity getAuthentication() throws RestApiConnectionException {
        log.debug("Authenticating salesforce");
        return this.connector.getAuthentication();
    }

    public List<Command> getSchemaMetadata(String schema, String entity) throws SchemaException {
        log.debug("Build url to retrieve schema");
        return SalesforceExtractor.constructGetCommand(this.sfConnector.getFullUri("/sobjects/" + entity.trim() + "/describe"));
    }

    public JsonArray getSchema(CommandOutput<?, ?> response) throws SchemaException {
        log.info("Get schema from salesforce");
        Iterator itr = response.getResults().values().iterator();
        if (!itr.hasNext()) {
            throw new SchemaException("Failed to get schema from salesforce; REST response has no output");
        }
        String output = (String)itr.next();
        JsonArray fieldJsonArray = new JsonArray();
        JsonElement element = (JsonElement)GSON.fromJson(output, JsonObject.class);
        JsonObject jsonObject = element.getAsJsonObject();
        try {
            JsonArray array = jsonObject.getAsJsonArray("fields");
            for (JsonElement columnElement : array) {
                JsonObject field = columnElement.getAsJsonObject();
                Schema schema = new Schema();
                schema.setColumnName(field.get("name").getAsString());
                String dataType = field.get("type").getAsString();
                String elementDataType = "string";
                List mapSymbols = null;
                JsonObject newDataType = this.convertDataType(field.get("name").getAsString(), dataType, elementDataType, mapSymbols);
                log.debug("ColumnName:" + field.get("name").getAsString() + ";   old datatype:" + dataType + ";   new datatype:" + newDataType);
                schema.setDataType(newDataType);
                schema.setLength(field.get("length").getAsLong());
                schema.setPrecision(field.get("precision").getAsInt());
                schema.setScale(field.get("scale").getAsInt());
                schema.setNullable(field.get("nillable").getAsBoolean());
                schema.setFormat(null);
                schema.setComment(field.get("label").isJsonNull() ? null : field.get("label").getAsString());
                schema.setDefaultValue(field.get("defaultValue").isJsonNull() ? null : field.get("defaultValue").getAsString());
                schema.setUnique(field.get("unique").getAsBoolean());
                String jsonStr = GSON.toJson((Object)schema);
                JsonObject obj = ((JsonObject)GSON.fromJson(jsonStr, JsonObject.class)).getAsJsonObject();
                fieldJsonArray.add((JsonElement)obj);
            }
        }
        catch (Exception e) {
            throw new SchemaException("Failed to get schema from salesforce; error - " + e.getMessage(), e);
        }
        return fieldJsonArray;
    }

    public List<Command> getHighWatermarkMetadata(String schema, String entity, String watermarkColumn, List<Predicate> predicateList) throws HighWatermarkException {
        String queryLowerCase;
        int startIndex;
        log.debug("Build url to retrieve high watermark");
        String query = "SELECT " + watermarkColumn + " FROM " + entity;
        String defaultPredicate = " " + watermarkColumn + " != null";
        String defaultSortOrder = " ORDER BY " + watermarkColumn + " desc LIMIT 1";
        String existingPredicate = "";
        if (this.updatedQuery != null && (startIndex = (queryLowerCase = this.updatedQuery.toLowerCase()).indexOf(" where ")) > 0) {
            existingPredicate = this.updatedQuery.substring(startIndex);
        }
        query = query + existingPredicate;
        String limitString = SalesforceExtractor.getLimitFromInputQuery(query);
        query = query.replace(limitString, "");
        ListIterator<Predicate> i = predicateList.listIterator();
        while (i.hasNext()) {
            Predicate predicate = (Predicate)i.next();
            query = SqlQueryUtils.addPredicate((String)query, (String)predicate.getCondition());
        }
        query = SqlQueryUtils.addPredicate((String)query, (String)defaultPredicate);
        query = query + defaultSortOrder;
        log.info("getHighWatermarkMetadata - QUERY: " + query);
        try {
            return SalesforceExtractor.constructGetCommand(this.sfConnector.getFullUri(SalesforceExtractor.getSoqlUrl(query)));
        }
        catch (Exception e) {
            throw new HighWatermarkException("Failed to get salesforce url for high watermark; error - " + e.getMessage(), e);
        }
    }

    public long getHighWatermark(CommandOutput<?, ?> response, String watermarkColumn, String format) throws HighWatermarkException {
        long highTs;
        block7: {
            log.info("Get high watermark from salesforce");
            Iterator itr = response.getResults().values().iterator();
            if (!itr.hasNext()) {
                throw new HighWatermarkException("Failed to get high watermark from salesforce; REST response has no output");
            }
            String output = (String)itr.next();
            JsonElement element = (JsonElement)GSON.fromJson(output, JsonObject.class);
            try {
                JsonObject jsonObject = element.getAsJsonObject();
                JsonArray jsonArray = jsonObject.getAsJsonArray("records");
                if (jsonArray.size() == 0) {
                    return -1L;
                }
                String value = jsonObject.getAsJsonArray("records").get(0).getAsJsonObject().get(watermarkColumn).getAsString();
                if (format != null) {
                    SimpleDateFormat inFormat = new SimpleDateFormat(format);
                    Date date = null;
                    try {
                        date = inFormat.parse(value);
                    }
                    catch (ParseException e) {
                        log.error("ParseException: " + e.getMessage(), (Throwable)e);
                    }
                    SimpleDateFormat outFormat = new SimpleDateFormat("yyyyMMddHHmmss");
                    highTs = Long.parseLong(outFormat.format(date));
                    break block7;
                }
                highTs = Long.parseLong(value);
            }
            catch (Exception e) {
                throw new HighWatermarkException("Failed to get high watermark from salesforce; error - " + e.getMessage(), e);
            }
        }
        return highTs;
    }

    public List<Command> getCountMetadata(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList) throws RecordCountException {
        String queryLowerCase;
        int startIndex;
        log.debug("Build url to retrieve source record count");
        String existingPredicate = "";
        if (this.updatedQuery != null && (startIndex = (queryLowerCase = this.updatedQuery.toLowerCase()).indexOf(" where ")) > 0) {
            existingPredicate = this.updatedQuery.substring(startIndex);
        }
        String query = "SELECT COUNT() FROM " + entity + existingPredicate;
        String limitString = SalesforceExtractor.getLimitFromInputQuery(query);
        query = query.replace(limitString, "");
        try {
            if (SalesforceExtractor.isNullPredicate(predicateList)) {
                log.info("QUERY with null predicate: " + query);
                return SalesforceExtractor.constructGetCommand(this.sfConnector.getFullUri(SalesforceExtractor.getSoqlUrl(query)));
            }
            ListIterator<Predicate> i = predicateList.listIterator();
            while (i.hasNext()) {
                Predicate predicate = (Predicate)i.next();
                query = SqlQueryUtils.addPredicate((String)query, (String)predicate.getCondition());
            }
            query = query + SalesforceExtractor.getLimitFromInputQuery(this.updatedQuery);
            log.info("getCountMetadata - QUERY: " + query);
            return SalesforceExtractor.constructGetCommand(this.sfConnector.getFullUri(SalesforceExtractor.getSoqlUrl(query)));
        }
        catch (Exception e) {
            throw new RecordCountException("Failed to get salesforce url for record count; error - " + e.getMessage(), e);
        }
    }

    public long getCount(CommandOutput<?, ?> response) throws RecordCountException {
        long count;
        log.info("Get source record count from salesforce");
        Iterator itr = response.getResults().values().iterator();
        if (!itr.hasNext()) {
            throw new RecordCountException("Failed to get count from salesforce; REST response has no output");
        }
        String output = (String)itr.next();
        JsonElement element = (JsonElement)GSON.fromJson(output, JsonObject.class);
        try {
            JsonObject jsonObject = element.getAsJsonObject();
            count = jsonObject.get("totalSize").getAsLong();
        }
        catch (Exception e) {
            throw new RecordCountException("Failed to get record count from salesforce; error - " + e.getMessage(), e);
        }
        return count;
    }

    public List<Command> getDataMetadata(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList) throws DataRecordException {
        log.debug("Build url to retrieve data records");
        String query = this.updatedQuery;
        String url = null;
        try {
            if (this.getNextUrl() != null && this.pullStatus) {
                url = this.getNextUrl();
            } else {
                if (SalesforceExtractor.isNullPredicate(predicateList)) {
                    log.info("getDataMetaData null predicate - QUERY:" + query);
                    return SalesforceExtractor.constructGetCommand(this.sfConnector.getFullUri(SalesforceExtractor.getSoqlUrl(query)));
                }
                String limitString = SalesforceExtractor.getLimitFromInputQuery(query);
                query = query.replace(limitString, "");
                ListIterator<Predicate> i = predicateList.listIterator();
                while (i.hasNext()) {
                    Predicate predicate = (Predicate)i.next();
                    query = SqlQueryUtils.addPredicate((String)query, (String)predicate.getCondition());
                }
                if (Boolean.valueOf(this.workUnitState.getProp("source.querybased.is.specific.api.active")).booleanValue()) {
                    query = SqlQueryUtils.addPredicate((String)query, (String)"IsDeleted = true");
                }
                query = query + limitString;
                log.info("getDataMetadata - QUERY: " + query);
                url = this.sfConnector.getFullUri(SalesforceExtractor.getSoqlUrl(query));
            }
            return SalesforceExtractor.constructGetCommand(url);
        }
        catch (Exception e) {
            throw new DataRecordException("Failed to get salesforce url for data records; error - " + e.getMessage(), e);
        }
    }

    private static String getLimitFromInputQuery(String query) {
        String inputQuery = query.toLowerCase();
        int limitIndex = inputQuery.indexOf(" limit");
        if (limitIndex > 0) {
            return query.substring(limitIndex);
        }
        return "";
    }

    public Iterator<JsonElement> getData(CommandOutput<?, ?> response) throws DataRecordException {
        log.debug("Get data records from response");
        Iterator itr = response.getResults().values().iterator();
        if (!itr.hasNext()) {
            throw new DataRecordException("Failed to get data from salesforce; REST response has no output");
        }
        String output = (String)itr.next();
        ArrayList rs = Lists.newArrayList();
        JsonElement element = (JsonElement)GSON.fromJson(output, JsonObject.class);
        try {
            JsonObject jsonObject = element.getAsJsonObject();
            JsonArray partRecords = jsonObject.getAsJsonArray("records");
            if (jsonObject.get("done").getAsBoolean()) {
                this.setPullStatus(false);
            } else {
                this.setNextUrl(this.sfConnector.getFullUri(jsonObject.get("nextRecordsUrl").getAsString().replaceAll(this.sfConnector.getServicesDataEnvPath(), "")));
            }
            JsonArray array = Utils.removeElementFromJsonArray((JsonArray)partRecords, (String)"attributes");
            for (JsonElement recordElement : array) {
                rs.add(recordElement);
            }
            return rs.iterator();
        }
        catch (Exception e) {
            throw new DataRecordException("Failed to get records from salesforce; error - " + e.getMessage(), e);
        }
    }

    public boolean getPullStatus() {
        return this.pullStatus;
    }

    public String getNextUrl() {
        return this.nextUrl;
    }

    public static String getSoqlUrl(String soqlQuery) throws RestApiClientException {
        String path = "/queryAll/";
        BasicNameValuePair pair = new BasicNameValuePair("q", soqlQuery);
        ArrayList<NameValuePair> qparams = new ArrayList<NameValuePair>();
        qparams.add((NameValuePair)pair);
        return SalesforceExtractor.buildUrl(path, qparams);
    }

    private static String buildUrl(String path, List<NameValuePair> qparams) throws RestApiClientException {
        URI uri;
        URIBuilder builder = new URIBuilder();
        builder.setPath(path);
        ListIterator<NameValuePair> i = qparams.listIterator();
        while (i.hasNext()) {
            NameValuePair keyValue = i.next();
            builder.setParameter(keyValue.getName(), keyValue.getValue());
        }
        try {
            uri = builder.build();
        }
        catch (Exception e) {
            throw new RestApiClientException("Failed to build url; error - " + e.getMessage(), e);
        }
        return new HttpGet(uri).getURI().toString();
    }

    private static boolean isNullPredicate(List<Predicate> predicateList) {
        return predicateList == null || predicateList.size() == 0;
    }

    public String getWatermarkSourceFormat(WatermarkType watermarkType) {
        switch (watermarkType) {
            case TIMESTAMP: {
                return "yyyy-MM-dd'T'HH:mm:ss";
            }
            case DATE: {
                return SALESFORCE_DATE_FORMAT;
            }
        }
        return null;
    }

    public String getHourPredicateCondition(String column, long value, String valueFormat, String operator) {
        log.info("Getting hour predicate from salesforce");
        String formattedvalue = Utils.toDateTimeFormat((String)Long.toString(value), (String)valueFormat, (String)SALESFORCE_HOUR_FORMAT);
        return column + " " + operator + " " + formattedvalue;
    }

    public String getDatePredicateCondition(String column, long value, String valueFormat, String operator) {
        log.info("Getting date predicate from salesforce");
        String formattedvalue = Utils.toDateTimeFormat((String)Long.toString(value), (String)valueFormat, (String)SALESFORCE_DATE_FORMAT);
        return column + " " + operator + " " + formattedvalue;
    }

    public String getTimestampPredicateCondition(String column, long value, String valueFormat, String operator) {
        log.info("Getting timestamp predicate from salesforce");
        String formattedvalue = Utils.toDateTimeFormat((String)Long.toString(value), (String)valueFormat, (String)SALESFORCE_TIMESTAMP_FORMAT);
        return column + " " + operator + " " + formattedvalue;
    }

    public Map<String, String> getDataTypeMap() {
        ImmutableMap dataTypeMap = ImmutableMap.builder().put((Object)"url", (Object)"string").put((Object)"textarea", (Object)"string").put((Object)"reference", (Object)"string").put((Object)"phone", (Object)"string").put((Object)"masterrecord", (Object)"string").put((Object)"location", (Object)"string").put((Object)"id", (Object)"string").put((Object)"encryptedstring", (Object)"string").put((Object)"email", (Object)"string").put((Object)"DataCategoryGroupReference", (Object)"string").put((Object)"calculated", (Object)"string").put((Object)"anyType", (Object)"string").put((Object)"address", (Object)"string").put((Object)"blob", (Object)"string").put((Object)"date", (Object)"date").put((Object)"datetime", (Object)"timestamp").put((Object)"time", (Object)"time").put((Object)"object", (Object)"string").put((Object)"string", (Object)"string").put((Object)"int", (Object)"int").put((Object)"long", (Object)"long").put((Object)"double", (Object)"double").put((Object)"percent", (Object)"double").put((Object)"currency", (Object)"double").put((Object)"decimal", (Object)"double").put((Object)"boolean", (Object)"boolean").put((Object)"picklist", (Object)"string").put((Object)"multipicklist", (Object)"string").put((Object)"combobox", (Object)"string").put((Object)"list", (Object)"string").put((Object)"set", (Object)"string").put((Object)"map", (Object)"string").put((Object)"enum", (Object)"string").build();
        return dataTypeMap;
    }

    private Iterator<JsonElement> fetchRecordSetPkChunking(WorkUnit workUnit) {
        if (this.isPkChunkingFetchDone.booleanValue()) {
            return null;
        }
        log.info("----Get records for pk-chunking----" + workUnit.getProp("__salesforce.job.id"));
        this.isPkChunkingFetchDone = true;
        this.bulkApiLogin();
        String jobId = workUnit.getProp("__salesforce.job.id");
        String batchIdResultIdPairString = workUnit.getProp("__salesforce.batch.result.id.pairs");
        List<FileIdVO> fileIdList = this.parseBatchIdResultIdString(jobId, batchIdResultIdPairString);
        return new ResultChainingIterator(this.bulkConnection, fileIdList, this.retryLimit, this.retryInterval, this.retryExceedQuotaInterval);
    }

    private List<FileIdVO> parseBatchIdResultIdString(String jobId, String batchIdResultIdString) {
        return Arrays.stream(batchIdResultIdString.split(",")).map(x -> x.split(":")).map(x -> new FileIdVO(jobId, x[0], x[1])).collect(Collectors.toList());
    }

    private Iterator<JsonElement> fetchRecordSet(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList) {
        if (this.isBulkFetchDone.booleanValue()) {
            return null;
        }
        this.isBulkFetchDone = true;
        log.info("----Get records for bulk batch job----");
        try {
            this.setBulkJobFinished(false);
            this.bulkResultIdList = this.getQueryResultIds(entity, predicateList);
            log.info("Number of bulk api resultSet Ids:" + this.bulkResultIdList.size());
            List<FileIdVO> fileIdVoList = this.bulkResultIdList.stream().map(x -> new FileIdVO(this.bulkJob.getId(), ((BatchIdAndResultId)x).batchId, ((BatchIdAndResultId)x).resultId)).collect(Collectors.toList());
            ResultChainingIterator chainingIter = new ResultChainingIterator(this.bulkConnection, fileIdVoList, this.retryLimit, this.retryInterval, this.retryExceedQuotaInterval);
            chainingIter.add(this.getSoftDeletedRecords(schema, entity, workUnit, predicateList));
            return chainingIter;
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to get records using bulk api; error - " + e.getMessage(), e);
        }
    }

    public Iterator<JsonElement> getRecordSetFromSourceApi(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList) {
        log.debug("Getting salesforce data using bulk api");
        if (workUnit.contains("__salesforce.job.id")) {
            return this.fetchRecordSetPkChunking(workUnit);
        }
        return this.fetchRecordSet(schema, entity, workUnit, predicateList);
    }

    private Iterator<JsonElement> getSoftDeletedRecords(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList) throws DataRecordException {
        boolean disableSoftDeletePull = this.workUnit.getPropAsBoolean("source.querybased.salesforce.is.soft.deletes.pull.disabled");
        if (this.columnList.contains("IsDeleted") && !disableSoftDeletePull) {
            return new QueryResultIterator(this, schema, entity, workUnit, predicateList);
        }
        log.info("Ignoring soft delete records");
        return null;
    }

    private void bulkApiLogin() {
        try {
            if (!this.doBulkApiLogin()) {
                throw new RuntimeException("invalid login");
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private boolean doBulkApiLogin() throws Exception {
        log.info("Authenticating salesforce bulk api");
        boolean success = false;
        String hostName = this.workUnitState.getProp("source.conn.host");
        String apiVersion = this.workUnitState.getProp("source.conn.version");
        if (Strings.isNullOrEmpty((String)apiVersion)) {
            apiVersion = this.bulkApiUseQueryAll ? "42.0" : "29.0";
        }
        String soapAuthEndPoint = hostName + SALESFORCE_SOAP_SERVICE + "/" + apiVersion;
        try {
            boolean isConnectSuccess;
            String accessToken;
            ConnectorConfig partnerConfig = new ConnectorConfig();
            if (this.workUnitState.contains("source.conn.use.proxy.url") && !this.workUnitState.getProp("source.conn.use.proxy.url").isEmpty()) {
                partnerConfig.setProxy(this.workUnitState.getProp("source.conn.use.proxy.url"), this.workUnitState.getPropAsInt("source.conn.use.proxy.port"));
            }
            if ((accessToken = this.sfConnector.getAccessToken()) == null && (isConnectSuccess = this.sfConnector.connect())) {
                accessToken = this.sfConnector.getAccessToken();
            }
            if (accessToken != null) {
                String serviceEndpoint = this.sfConnector.getInstanceUrl() + SALESFORCE_SOAP_SERVICE + "/" + apiVersion;
                partnerConfig.setSessionId(accessToken);
                partnerConfig.setServiceEndpoint(serviceEndpoint);
            } else {
                String securityToken = this.workUnitState.getProp("source.conn.security.token");
                String password = PasswordManager.getInstance((State)this.workUnitState).readPassword(this.workUnitState.getProp("source.conn.password"));
                partnerConfig.setUsername(this.workUnitState.getProp("source.conn.username"));
                partnerConfig.setPassword(password + securityToken);
            }
            partnerConfig.setAuthEndpoint(soapAuthEndPoint);
            new PartnerConnection(partnerConfig);
            String soapEndpoint = partnerConfig.getServiceEndpoint();
            String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/")) + "async/" + apiVersion;
            ConnectorConfig config = this.createConfig();
            config.setSessionId(partnerConfig.getSessionId());
            config.setRestEndpoint(restEndpoint);
            this.bulkConnection = this.getBulkConnection(config);
            success = true;
        }
        catch (RuntimeException e) {
            throw new RuntimeException("Failed to connect to salesforce bulk api; error - " + e, e);
        }
        return success;
    }

    public ResultFileIdsStruct getQueryResultIdsPkChunkingFetchOnly(String jobId, String batchIdListStr) {
        this.bulkApiLogin();
        try {
            int retryInterval = Math.min(600000, 30 + this.pkChunkingSize * 2);
            if (StringUtils.isNotEmpty((String)batchIdListStr)) {
                log.info("The batchId is specified.");
                return this.retrievePkChunkingResultIdsByBatchId(this.bulkConnection, jobId, batchIdListStr);
            }
            ResultFileIdsStruct resultStruct = this.retrievePkChunkingResultIds(this.bulkConnection, jobId, retryInterval);
            if (resultStruct.getBatchIdAndResultIdList().isEmpty()) {
                String msg = String.format("There are no result for the [jobId: %s, batchIds: %s]", jobId, batchIdListStr);
                throw new RuntimeException(msg);
            }
            return resultStruct;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public BulkConnection getBulkConnection(ConnectorConfig config) throws AsyncApiException {
        return new BulkConnection(config);
    }

    public ResultFileIdsStruct getQueryResultIdsPkChunking(String entity, List<Predicate> predicateList) {
        this.bulkApiLogin();
        try {
            BulkConnection connection = this.bulkConnection;
            JobInfo jobRequest = new JobInfo();
            jobRequest.setObject(entity);
            jobRequest.setOperation(OperationEnum.queryAll);
            jobRequest.setConcurrencyMode(ConcurrencyMode.Parallel);
            log.info("Enabling pk chunking with size {}", (Object)this.pkChunkingSize);
            connection.addHeader("Sforce-Enable-PKChunking", "chunkSize=" + this.pkChunkingSize);
            jobRequest.setContentType(ContentType.CSV);
            JobInfo createdJob = connection.createJob(jobRequest);
            log.info("Created bulk job: {}", (Object)createdJob.getId());
            this.bulkJob = createdJob;
            String jobId = createdJob.getId();
            JobInfo jobResponse = connection.getJobStatus(jobId);
            String query = this.updatedQuery;
            if (!SalesforceExtractor.isNullPredicate(predicateList)) {
                String limitString = SalesforceExtractor.getLimitFromInputQuery(query);
                query = query.replace(limitString, "");
                ListIterator<Predicate> i = predicateList.listIterator();
                while (i.hasNext()) {
                    Predicate predicate = (Predicate)i.next();
                    query = SqlQueryUtils.addPredicate((String)query, (String)predicate.getCondition());
                }
                query = query + limitString;
            }
            log.info("Submitting PK Chunking query:" + query);
            ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes(ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
            BatchInfo executeQueryBatch = connection.createBatchFromStream(jobResponse, (InputStream)bout);
            String pkChunkingBatchId = executeQueryBatch.getId();
            int waitMilliSecond = 60000;
            BatchInfo batchResponse = connection.getBatchInfo(jobId, pkChunkingBatchId);
            BatchStateEnum batchState = batchResponse.getState();
            while (batchState == BatchStateEnum.InProgress || batchState == BatchStateEnum.Queued) {
                Thread.sleep(waitMilliSecond);
                batchResponse = connection.getBatchInfo(createdJob.getId(), executeQueryBatch.getId());
                log.info("Waiting for first batch (jobId={}, pkChunkingBatchId={})", (Object)jobId, (Object)pkChunkingBatchId);
                batchState = batchResponse.getState();
            }
            if (batchResponse.getState() == BatchStateEnum.Failed) {
                log.error("Bulk batch failed: " + batchResponse.toString());
                throw new Exception("Failed to get bulk batch info for jobId " + jobId + " error - " + batchResponse.getStateMessage());
            }
            ResultFileIdsStruct resultFileIdsStruct = this.retrievePkChunkingResultIds(connection, jobId, waitMilliSecond);
            return resultFileIdsStruct;
        }
        catch (Exception e) {
            throw new RuntimeException("getQueryResultIdsPkChunking: error - " + e.getMessage(), e);
        }
    }

    private List<BatchIdAndResultId> getQueryResultIds(String entity, List<Predicate> predicateList) throws Exception {
        this.bulkApiLogin();
        try {
            this.bulkJob.setObject(entity);
            this.bulkJob.setOperation(this.bulkApiUseQueryAll ? OperationEnum.queryAll : OperationEnum.query);
            this.bulkJob.setConcurrencyMode(ConcurrencyMode.Parallel);
            this.bulkJob.setContentType(ContentType.CSV);
            this.bulkJob = this.bulkConnection.createJob(this.bulkJob);
            log.info("Created bulk job [jobId={}]", (Object)this.bulkJob.getId());
            this.bulkJob = this.bulkConnection.getJobStatus(this.bulkJob.getId());
            String query = this.updatedQuery;
            if (!SalesforceExtractor.isNullPredicate(predicateList)) {
                String limitString = SalesforceExtractor.getLimitFromInputQuery(query);
                query = query.replace(limitString, "");
                ListIterator<Predicate> i = predicateList.listIterator();
                while (i.hasNext()) {
                    Predicate predicate = (Predicate)i.next();
                    query = SqlQueryUtils.addPredicate((String)query, (String)predicate.getCondition());
                }
                query = query + limitString;
            }
            log.info("getQueryResultIds - QUERY:" + query);
            ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes(ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
            BatchInfo bulkBatchInfo = this.bulkConnection.createBatchFromStream(this.bulkJob, (InputStream)bout);
            bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bulkBatchInfo.getId());
            int count = 0;
            long minWaitTimeInMilliSeconds = this.workUnitState.getPropAsLong("extract.salesforce.bulkApi.minWaitTimeInMillis", 60000L);
            long maxWaitTimeInMilliSeconds = this.workUnitState.getPropAsLong("extract.salesforce.bulkApi.maxWaitTimeInMillis", 600000L);
            while (bulkBatchInfo.getState() == BatchStateEnum.InProgress || bulkBatchInfo.getState() == BatchStateEnum.Queued) {
                log.info("Waiting for bulk resultSetIds");
                long waitMilliSeconds = Math.min((long)(Math.pow(2.0, count) * (double)minWaitTimeInMilliSeconds), maxWaitTimeInMilliSeconds);
                Thread.sleep(waitMilliSeconds);
                bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bulkBatchInfo.getId());
                ++count;
            }
            BatchInfoList batchInfoList = this.bulkConnection.getBatchInfoList(this.bulkJob.getId());
            BatchStateEnum state = bulkBatchInfo.getState();
            if (state == BatchStateEnum.Failed || state == BatchStateEnum.InProgress) {
                log.error("Bulk batch failed: " + bulkBatchInfo.toString());
                throw new RuntimeException("Failed to get bulk batch info for jobId " + bulkBatchInfo.getJobId() + " error - " + bulkBatchInfo.getStateMessage());
            }
            ArrayList batchIdAndResultIdList = Lists.newArrayList();
            for (BatchInfo bi : batchInfoList.getBatchInfo()) {
                QueryResultList list = this.bulkConnection.getQueryResultList(this.bulkJob.getId(), bi.getId());
                for (String result : list.getResult()) {
                    batchIdAndResultIdList.add(new BatchIdAndResultId(bi.getId(), result));
                }
            }
            log.info("QueryResultList: " + batchIdAndResultIdList);
            return batchIdAndResultIdList;
        }
        catch (AsyncApiException | InterruptedException | RuntimeException e) {
            throw new RuntimeException("Failed to get query result ids from salesforce using bulk api; error - " + e.getMessage(), e);
        }
    }

    public void closeConnection() throws Exception {
        if (this.bulkConnection != null && !this.bulkConnection.getJobStatus(this.getBulkJobId()).getState().toString().equals("Closed")) {
            log.info("Closing salesforce bulk job connection");
            this.bulkConnection.closeJob(this.getBulkJobId());
        }
        this.sfConnector.close();
    }

    private static List<Command> constructGetCommand(String restQuery) {
        return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery), (CommandType)RestApiCommand.RestApiCommandType.GET));
    }

    private ResultFileIdsStruct retrievePkChunkingResultIdsByBatchId(BulkConnection connection, String jobId, String batchIdListStr) {
        Iterator<String> batchIds = Arrays.stream(batchIdListStr.split(",")).map(x -> x.trim()).filter(x -> !x.equals("")).iterator();
        try {
            List<BatchIdAndResultId> batchIdAndResultIdList = this.fetchBatchResultIds(connection, jobId, batchIds);
            return new ResultFileIdsStruct(jobId, batchIdAndResultIdList);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private ResultFileIdsStruct retrievePkChunkingResultIds(BulkConnection connection, String jobId, int waitMilliSecond) {
        log.info("Waiting for completion of the the bulk job [jobId={}])'s sub queries.", (Object)jobId);
        try {
            BatchInfoList batchInfoList;
            BatchInfo[] batchInfos;
            while (this.needContinueToPoll(batchInfos = (batchInfoList = connection.getBatchInfoList(jobId)).getBatchInfo(), waitMilliSecond).booleanValue()) {
            }
            if (Arrays.stream(batchInfos).filter(x -> x.getState() == BatchStateEnum.NotProcessed).count() != 1L) {
                throw new Exception("PK-Chunking query should have 1 and only 1 batch with state=NotProcessed.");
            }
            Stream<BatchInfo> stream = Arrays.stream(batchInfos);
            Iterator<String> batchIds = stream.filter(x -> x.getNumberRecordsProcessed() != 0).map(x -> x.getId()).iterator();
            List<BatchIdAndResultId> batchIdAndResultIdList = this.fetchBatchResultIds(connection, jobId, batchIds);
            return new ResultFileIdsStruct(jobId, batchIdAndResultIdList);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private List<BatchIdAndResultId> fetchBatchResultIds(BulkConnection connection, String jobId, Iterator<String> batchIds) throws Exception {
        ArrayList batchIdAndResultIdList = Lists.newArrayList();
        while (batchIds.hasNext()) {
            String batchId = batchIds.next();
            QueryResultList result = connection.getQueryResultList(jobId, batchId);
            Stream<String> stream = Arrays.stream(result.getResult());
            Iterator it = stream.map(rId -> new BatchIdAndResultId(batchId, (String)rId)).iterator();
            Iterators.addAll((Collection)batchIdAndResultIdList, it);
        }
        return batchIdAndResultIdList;
    }

    private Boolean needContinueToPoll(BatchInfo[] batchInfos, long toWait) {
        long queued = Arrays.stream(batchInfos).filter(x -> x.getState() == BatchStateEnum.Queued).count();
        long inProgress = Arrays.stream(batchInfos).filter(x -> x.getState() == BatchStateEnum.InProgress).count();
        for (BatchInfo bi : batchInfos) {
            BatchStateEnum state = bi.getState();
            if (state == BatchStateEnum.InProgress || state == BatchStateEnum.Queued) {
                try {
                    log.info("Total: {}, queued: {}, InProgress: {}, waiting for [batchId: {}, state: {}]", new Object[]{batchInfos.length, queued, inProgress, bi.getId(), state});
                    Thread.sleep(toWait);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                return true;
            }
            if (state != BatchStateEnum.Failed) continue;
            throw new RuntimeException(String.format("[batchId=%s] failed", bi.getId()));
        }
        return false;
    }

    public ConnectorConfig createConfig() {
        ConnectorConfig config = new ConnectorConfig();
        config.setCompression(true);
        try {
            config.setTraceFile("traceLogs.txt");
        }
        catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        config.setTraceMessage(false);
        config.setPrettyPrintXml(true);
        if (this.workUnitState.contains("source.conn.use.proxy.url") && !this.workUnitState.getProp("source.conn.use.proxy.url").isEmpty()) {
            config.setProxy(this.workUnitState.getProp("source.conn.use.proxy.url"), this.workUnitState.getPropAsInt("source.conn.use.proxy.port"));
        }
        return config;
    }

    public String getBulkJobId() {
        return this.workUnit.getProp("__salesforce.job.id", this.bulkJob.getId());
    }

    public static class ResultFileIdsStruct {
        private final String jobId;
        private final List<BatchIdAndResultId> batchIdAndResultIdList;

        public ResultFileIdsStruct(String jobId, List<BatchIdAndResultId> batchIdAndResultIdList) {
            this.jobId = jobId;
            this.batchIdAndResultIdList = batchIdAndResultIdList;
        }

        public String getJobId() {
            return this.jobId;
        }

        public List<BatchIdAndResultId> getBatchIdAndResultIdList() {
            return this.batchIdAndResultIdList;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof ResultFileIdsStruct)) {
                return false;
            }
            ResultFileIdsStruct other = (ResultFileIdsStruct)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$jobId = this.getJobId();
            String other$jobId = other.getJobId();
            if (this$jobId == null ? other$jobId != null : !this$jobId.equals(other$jobId)) {
                return false;
            }
            List<BatchIdAndResultId> this$batchIdAndResultIdList = this.getBatchIdAndResultIdList();
            List<BatchIdAndResultId> other$batchIdAndResultIdList = other.getBatchIdAndResultIdList();
            return !(this$batchIdAndResultIdList == null ? other$batchIdAndResultIdList != null : !((Object)this$batchIdAndResultIdList).equals(other$batchIdAndResultIdList));
        }

        protected boolean canEqual(Object other) {
            return other instanceof ResultFileIdsStruct;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $jobId = this.getJobId();
            result = result * 59 + ($jobId == null ? 43 : $jobId.hashCode());
            List<BatchIdAndResultId> $batchIdAndResultIdList = this.getBatchIdAndResultIdList();
            result = result * 59 + ($batchIdAndResultIdList == null ? 43 : ((Object)$batchIdAndResultIdList).hashCode());
            return result;
        }

        public String toString() {
            return "SalesforceExtractor.ResultFileIdsStruct(jobId=" + this.getJobId() + ", batchIdAndResultIdList=" + this.getBatchIdAndResultIdList() + ")";
        }
    }

    public static class BatchIdAndResultId {
        private final String batchId;
        private final String resultId;

        public BatchIdAndResultId(String batchId, String resultId) {
            this.batchId = batchId;
            this.resultId = resultId;
        }

        public String getBatchId() {
            return this.batchId;
        }

        public String getResultId() {
            return this.resultId;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof BatchIdAndResultId)) {
                return false;
            }
            BatchIdAndResultId other = (BatchIdAndResultId)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$batchId = this.getBatchId();
            String other$batchId = other.getBatchId();
            if (this$batchId == null ? other$batchId != null : !this$batchId.equals(other$batchId)) {
                return false;
            }
            String this$resultId = this.getResultId();
            String other$resultId = other.getResultId();
            return !(this$resultId == null ? other$resultId != null : !this$resultId.equals(other$resultId));
        }

        protected boolean canEqual(Object other) {
            return other instanceof BatchIdAndResultId;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $batchId = this.getBatchId();
            result = result * 59 + ($batchId == null ? 43 : $batchId.hashCode());
            String $resultId = this.getResultId();
            result = result * 59 + ($resultId == null ? 43 : $resultId.hashCode());
            return result;
        }

        public String toString() {
            return "SalesforceExtractor.BatchIdAndResultId(batchId=" + this.getBatchId() + ", resultId=" + this.getResultId() + ")";
        }
    }
}

