/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.salesforce;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.sforce.async.AsyncApiException;
import com.sforce.async.BatchInfo;
import com.sforce.async.BatchInfoList;
import com.sforce.async.BatchStateEnum;
import com.sforce.async.BulkConnection;
import com.sforce.async.ConcurrencyMode;
import com.sforce.async.ContentType;
import com.sforce.async.JobInfo;
import com.sforce.async.OperationEnum;
import com.sforce.async.QueryResultList;
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.ws.ConnectorConfig;
import java.beans.ConstructorProperties;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.password.PasswordManager;
import org.apache.gobblin.salesforce.SalesforceConnector;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.exception.HighWatermarkException;
import org.apache.gobblin.source.extractor.exception.RecordCountException;
import org.apache.gobblin.source.extractor.exception.RestApiClientException;
import org.apache.gobblin.source.extractor.exception.RestApiConnectionException;
import org.apache.gobblin.source.extractor.exception.SchemaException;
import org.apache.gobblin.source.extractor.extract.Command;
import org.apache.gobblin.source.extractor.extract.CommandOutput;
import org.apache.gobblin.source.extractor.extract.CommandType;
import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
import org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector;
import org.apache.gobblin.source.extractor.extract.restapi.RestApiExtractor;
import org.apache.gobblin.source.extractor.resultset.RecordSet;
import org.apache.gobblin.source.extractor.resultset.RecordSetList;
import org.apache.gobblin.source.extractor.schema.Schema;
import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
import org.apache.gobblin.source.extractor.utils.Utils;
import org.apache.gobblin.source.extractor.watermark.Predicate;
import org.apache.gobblin.source.extractor.watermark.WatermarkType;
import org.apache.gobblin.source.jdbc.SqlQueryUtils;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SalesforceExtractor
extends RestApiExtractor {
    private static final Logger log = LoggerFactory.getLogger(SalesforceExtractor.class);
    private static final String SOQL_RESOURCE = "/queryAll";
    public static final String SALESFORCE_TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'.000Z'";
    private static final String SALESFORCE_DATE_FORMAT = "yyyy-MM-dd";
    private static final String SALESFORCE_HOUR_FORMAT = "HH";
    private static final String SALESFORCE_SOAP_SERVICE = "/services/Soap/u";
    private static final Gson GSON = new Gson();
    private static final int MAX_PK_CHUNKING_SIZE = 250000;
    private static final int MIN_PK_CHUNKING_SIZE = 100000;
    private static final int DEFAULT_PK_CHUNKING_SIZE = 200000;
    private static final String ENABLE_PK_CHUNKING_KEY = "salesforce.enablePkChunking";
    private static final String PK_CHUNKING_SIZE_KEY = "salesforce.pkChunkingSize";
    private static final int MAX_RETRY_INTERVAL_SECS = 600;
    private static final int PK_CHUNKING_MAX_PARTITIONS_LIMIT = 3;
    private static final String FETCH_RETRY_LIMIT_KEY = "salesforce.fetchRetryLimit";
    private static final int DEFAULT_FETCH_RETRY_LIMIT = 5;
    private static final String BULK_API_USE_QUERY_ALL = "salesforce.bulkApiUseQueryAll";
    private static final boolean DEFAULT_BULK_API_USE_QUERY_ALL = false;
    private static final String PK_CHUNKING_SKIP_COUNT_CHECK = "salesforce.pkChunkingSkipCountCheck";
    private static final boolean DEFAULT_PK_CHUNKING_SKIP_COUNT_CHECK = false;
    private boolean pullStatus = true;
    private String nextUrl;
    private BulkConnection bulkConnection = null;
    private boolean bulkApiInitialRun = true;
    private JobInfo bulkJob = new JobInfo();
    private BufferedReader bulkBufferedReader = null;
    private List<BatchIdAndResultId> bulkResultIdList = Lists.newArrayList();
    private int bulkResultIdCount = 0;
    private boolean bulkJobFinished = true;
    private List<String> bulkRecordHeader;
    private int bulkResultColumCount;
    private boolean newBulkResultSet = true;
    private int bulkRecordCount = 0;
    private int prevBulkRecordCount = 0;
    private List<String> csvRecord;
    private final boolean pkChunking;
    private final int pkChunkingSize;
    private final SalesforceConnector sfConnector = (SalesforceConnector)this.connector;
    private final int fetchRetryLimit;
    private final int batchSize;
    private final boolean pkChunkingSkipCountCheck;
    private final boolean bulkApiUseQueryAll;

    public SalesforceExtractor(WorkUnitState state) {
        super(state);
        if (state.getPropAsBoolean("partitioner.hasUserSpecifiedPartitions", false) || state.getPropAsInt("source.max.number.of.partitions", 20) > 3) {
            if (state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false)) {
                log.warn("Max partitions too high, so PK chunking is not enabled");
            }
            this.pkChunking = false;
        } else {
            this.pkChunking = state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false);
        }
        this.pkChunkingSize = Math.max(100000, Math.min(250000, state.getPropAsInt(PK_CHUNKING_SIZE_KEY, 200000)));
        this.pkChunkingSkipCountCheck = state.getPropAsBoolean(PK_CHUNKING_SKIP_COUNT_CHECK, false);
        this.bulkApiUseQueryAll = state.getPropAsBoolean(BULK_API_USE_QUERY_ALL, false);
        int tmpBatchSize = state.getPropAsInt("source.querybased.fetch.size", 1000);
        this.batchSize = tmpBatchSize == 0 ? 1000 : tmpBatchSize;
        this.fetchRetryLimit = state.getPropAsInt(FETCH_RETRY_LIMIT_KEY, 5);
    }

    protected RestApiConnector getConnector(WorkUnitState state) {
        return new SalesforceConnector((State)state);
    }

    public void setPullStatus(boolean pullStatus) {
        this.pullStatus = pullStatus;
    }

    public void setNextUrl(String nextUrl) {
        this.nextUrl = nextUrl;
    }

    private boolean isBulkJobFinished() {
        return this.bulkJobFinished;
    }

    private void setBulkJobFinished(boolean bulkJobFinished) {
        this.bulkJobFinished = bulkJobFinished;
    }

    public boolean isNewBulkResultSet() {
        return this.newBulkResultSet;
    }

    public void setNewBulkResultSet(boolean newBulkResultSet) {
        this.newBulkResultSet = newBulkResultSet;
    }

    public HttpEntity getAuthentication() throws RestApiConnectionException {
        log.debug("Authenticating salesforce");
        return this.connector.getAuthentication();
    }

    public List<Command> getSchemaMetadata(String schema, String entity) throws SchemaException {
        log.debug("Build url to retrieve schema");
        return SalesforceExtractor.constructGetCommand(this.sfConnector.getFullUri("/sobjects/" + entity.trim() + "/describe"));
    }

    public JsonArray getSchema(CommandOutput<?, ?> response) throws SchemaException {
        log.info("Get schema from salesforce");
        Iterator itr = response.getResults().values().iterator();
        if (!itr.hasNext()) {
            throw new SchemaException("Failed to get schema from salesforce; REST response has no output");
        }
        String output = (String)itr.next();
        JsonArray fieldJsonArray = new JsonArray();
        JsonElement element = (JsonElement)GSON.fromJson(output, JsonObject.class);
        JsonObject jsonObject = element.getAsJsonObject();
        try {
            JsonArray array = jsonObject.getAsJsonArray("fields");
            for (JsonElement columnElement : array) {
                JsonObject field = columnElement.getAsJsonObject();
                Schema schema = new Schema();
                schema.setColumnName(field.get("name").getAsString());
                String dataType = field.get("type").getAsString();
                String elementDataType = "string";
                List mapSymbols = null;
                JsonObject newDataType = this.convertDataType(field.get("name").getAsString(), dataType, elementDataType, mapSymbols);
                log.debug("ColumnName:" + field.get("name").getAsString() + ";   old datatype:" + dataType + ";   new datatype:" + newDataType);
                schema.setDataType(newDataType);
                schema.setLength(field.get("length").getAsLong());
                schema.setPrecision(field.get("precision").getAsInt());
                schema.setScale(field.get("scale").getAsInt());
                schema.setNullable(field.get("nillable").getAsBoolean());
                schema.setFormat(null);
                schema.setComment(field.get("label").isJsonNull() ? null : field.get("label").getAsString());
                schema.setDefaultValue(field.get("defaultValue").isJsonNull() ? null : field.get("defaultValue").getAsString());
                schema.setUnique(field.get("unique").getAsBoolean());
                String jsonStr = GSON.toJson((Object)schema);
                JsonObject obj = ((JsonObject)GSON.fromJson(jsonStr, JsonObject.class)).getAsJsonObject();
                fieldJsonArray.add((JsonElement)obj);
            }
        }
        catch (Exception e) {
            throw new SchemaException("Failed to get schema from salesforce; error - " + e.getMessage(), e);
        }
        return fieldJsonArray;
    }

    public List<Command> getHighWatermarkMetadata(String schema, String entity, String watermarkColumn, List<Predicate> predicateList) throws HighWatermarkException {
        String queryLowerCase;
        int startIndex;
        log.debug("Build url to retrieve high watermark");
        String query = "SELECT " + watermarkColumn + " FROM " + entity;
        String defaultPredicate = " " + watermarkColumn + " != null";
        String defaultSortOrder = " ORDER BY " + watermarkColumn + " desc LIMIT 1";
        String existingPredicate = "";
        if (this.updatedQuery != null && (startIndex = (queryLowerCase = this.updatedQuery.toLowerCase()).indexOf(" where ")) > 0) {
            existingPredicate = this.updatedQuery.substring(startIndex);
        }
        query = query + existingPredicate;
        String limitString = SalesforceExtractor.getLimitFromInputQuery(query);
        query = query.replace(limitString, "");
        ListIterator<Predicate> i = predicateList.listIterator();
        while (i.hasNext()) {
            Predicate predicate = (Predicate)i.next();
            query = SqlQueryUtils.addPredicate((String)query, (String)predicate.getCondition());
        }
        query = SqlQueryUtils.addPredicate((String)query, (String)defaultPredicate);
        query = query + defaultSortOrder;
        log.info("QUERY: " + query);
        try {
            return SalesforceExtractor.constructGetCommand(this.sfConnector.getFullUri(SalesforceExtractor.getSoqlUrl(query)));
        }
        catch (Exception e) {
            throw new HighWatermarkException("Failed to get salesforce url for high watermark; error - " + e.getMessage(), e);
        }
    }

    public long getHighWatermark(CommandOutput<?, ?> response, String watermarkColumn, String format) throws HighWatermarkException {
        long high_ts;
        block7: {
            log.info("Get high watermark from salesforce");
            Iterator itr = response.getResults().values().iterator();
            if (!itr.hasNext()) {
                throw new HighWatermarkException("Failed to get high watermark from salesforce; REST response has no output");
            }
            String output = (String)itr.next();
            JsonElement element = (JsonElement)GSON.fromJson(output, JsonObject.class);
            try {
                JsonObject jsonObject = element.getAsJsonObject();
                JsonArray jsonArray = jsonObject.getAsJsonArray("records");
                if (jsonArray.size() == 0) {
                    return -1L;
                }
                String value = jsonObject.getAsJsonArray("records").get(0).getAsJsonObject().get(watermarkColumn).getAsString();
                if (format != null) {
                    SimpleDateFormat inFormat = new SimpleDateFormat(format);
                    Date date = null;
                    try {
                        date = inFormat.parse(value);
                    }
                    catch (ParseException e) {
                        log.error("ParseException: " + e.getMessage(), (Throwable)e);
                    }
                    SimpleDateFormat outFormat = new SimpleDateFormat("yyyyMMddHHmmss");
                    high_ts = Long.parseLong(outFormat.format(date));
                    break block7;
                }
                high_ts = Long.parseLong(value);
            }
            catch (Exception e) {
                throw new HighWatermarkException("Failed to get high watermark from salesforce; error - " + e.getMessage(), e);
            }
        }
        return high_ts;
    }

    public List<Command> getCountMetadata(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList) throws RecordCountException {
        String queryLowerCase;
        int startIndex;
        log.debug("Build url to retrieve source record count");
        String existingPredicate = "";
        if (this.updatedQuery != null && (startIndex = (queryLowerCase = this.updatedQuery.toLowerCase()).indexOf(" where ")) > 0) {
            existingPredicate = this.updatedQuery.substring(startIndex);
        }
        String query = "SELECT COUNT() FROM " + entity + existingPredicate;
        String limitString = SalesforceExtractor.getLimitFromInputQuery(query);
        query = query.replace(limitString, "");
        try {
            if (SalesforceExtractor.isNullPredicate(predicateList)) {
                log.info("QUERY with null predicate: " + query);
                return SalesforceExtractor.constructGetCommand(this.sfConnector.getFullUri(SalesforceExtractor.getSoqlUrl(query)));
            }
            ListIterator<Predicate> i = predicateList.listIterator();
            while (i.hasNext()) {
                Predicate predicate = (Predicate)i.next();
                query = SqlQueryUtils.addPredicate((String)query, (String)predicate.getCondition());
            }
            query = query + SalesforceExtractor.getLimitFromInputQuery(this.updatedQuery);
            log.info("QUERY: " + query);
            return SalesforceExtractor.constructGetCommand(this.sfConnector.getFullUri(SalesforceExtractor.getSoqlUrl(query)));
        }
        catch (Exception e) {
            throw new RecordCountException("Failed to get salesforce url for record count; error - " + e.getMessage(), e);
        }
    }

    public long getCount(CommandOutput<?, ?> response) throws RecordCountException {
        long count;
        log.info("Get source record count from salesforce");
        Iterator itr = response.getResults().values().iterator();
        if (!itr.hasNext()) {
            throw new RecordCountException("Failed to get count from salesforce; REST response has no output");
        }
        String output = (String)itr.next();
        JsonElement element = (JsonElement)GSON.fromJson(output, JsonObject.class);
        try {
            JsonObject jsonObject = element.getAsJsonObject();
            count = jsonObject.get("totalSize").getAsLong();
        }
        catch (Exception e) {
            throw new RecordCountException("Failed to get record count from salesforce; error - " + e.getMessage(), e);
        }
        return count;
    }

    public List<Command> getDataMetadata(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList) throws DataRecordException {
        log.debug("Build url to retrieve data records");
        String query = this.updatedQuery;
        String url = null;
        try {
            if (this.getNextUrl() != null && this.pullStatus) {
                url = this.getNextUrl();
            } else {
                if (SalesforceExtractor.isNullPredicate(predicateList)) {
                    log.info("QUERY:" + query);
                    return SalesforceExtractor.constructGetCommand(this.sfConnector.getFullUri(SalesforceExtractor.getSoqlUrl(query)));
                }
                String limitString = SalesforceExtractor.getLimitFromInputQuery(query);
                query = query.replace(limitString, "");
                ListIterator<Predicate> i = predicateList.listIterator();
                while (i.hasNext()) {
                    Predicate predicate = (Predicate)i.next();
                    query = SqlQueryUtils.addPredicate((String)query, (String)predicate.getCondition());
                }
                if (Boolean.valueOf(this.workUnitState.getProp("source.querybased.is.specific.api.active")).booleanValue()) {
                    query = SqlQueryUtils.addPredicate((String)query, (String)"IsDeleted = true");
                }
                query = query + limitString;
                log.info("QUERY: " + query);
                url = this.sfConnector.getFullUri(SalesforceExtractor.getSoqlUrl(query));
            }
            return SalesforceExtractor.constructGetCommand(url);
        }
        catch (Exception e) {
            throw new DataRecordException("Failed to get salesforce url for data records; error - " + e.getMessage(), e);
        }
    }

    private static String getLimitFromInputQuery(String query) {
        String inputQuery = query.toLowerCase();
        int limitIndex = inputQuery.indexOf(" limit");
        if (limitIndex > 0) {
            return query.substring(limitIndex);
        }
        return "";
    }

    public Iterator<JsonElement> getData(CommandOutput<?, ?> response) throws DataRecordException {
        log.debug("Get data records from response");
        Iterator itr = response.getResults().values().iterator();
        if (!itr.hasNext()) {
            throw new DataRecordException("Failed to get data from salesforce; REST response has no output");
        }
        String output = (String)itr.next();
        ArrayList rs = Lists.newArrayList();
        JsonElement element = (JsonElement)GSON.fromJson(output, JsonObject.class);
        try {
            JsonObject jsonObject = element.getAsJsonObject();
            JsonArray partRecords = jsonObject.getAsJsonArray("records");
            if (jsonObject.get("done").getAsBoolean()) {
                this.setPullStatus(false);
            } else {
                this.setNextUrl(this.sfConnector.getFullUri(jsonObject.get("nextRecordsUrl").getAsString().replaceAll(this.sfConnector.getServicesDataEnvPath(), "")));
            }
            JsonArray array = Utils.removeElementFromJsonArray((JsonArray)partRecords, (String)"attributes");
            for (JsonElement recordElement : array) {
                rs.add(recordElement);
            }
            return rs.iterator();
        }
        catch (Exception e) {
            throw new DataRecordException("Failed to get records from salesforce; error - " + e.getMessage(), e);
        }
    }

    public boolean getPullStatus() {
        return this.pullStatus;
    }

    public String getNextUrl() {
        return this.nextUrl;
    }

    public static String getSoqlUrl(String soqlQuery) throws RestApiClientException {
        String path = "/queryAll/";
        BasicNameValuePair pair = new BasicNameValuePair("q", soqlQuery);
        ArrayList<NameValuePair> qparams = new ArrayList<NameValuePair>();
        qparams.add((NameValuePair)pair);
        return SalesforceExtractor.buildUrl(path, qparams);
    }

    private static String buildUrl(String path, List<NameValuePair> qparams) throws RestApiClientException {
        URI uri;
        URIBuilder builder = new URIBuilder();
        builder.setPath(path);
        ListIterator<NameValuePair> i = qparams.listIterator();
        while (i.hasNext()) {
            NameValuePair keyValue = i.next();
            builder.setParameter(keyValue.getName(), keyValue.getValue());
        }
        try {
            uri = builder.build();
        }
        catch (Exception e) {
            throw new RestApiClientException("Failed to build url; error - " + e.getMessage(), e);
        }
        return new HttpGet(uri).getURI().toString();
    }

    private static boolean isNullPredicate(List<Predicate> predicateList) {
        return predicateList == null || predicateList.size() == 0;
    }

    public String getWatermarkSourceFormat(WatermarkType watermarkType) {
        switch (watermarkType) {
            case TIMESTAMP: {
                return "yyyy-MM-dd'T'HH:mm:ss";
            }
            case DATE: {
                return SALESFORCE_DATE_FORMAT;
            }
        }
        return null;
    }

    public String getHourPredicateCondition(String column, long value, String valueFormat, String operator) {
        log.info("Getting hour predicate from salesforce");
        String Formattedvalue = Utils.toDateTimeFormat((String)Long.toString(value), (String)valueFormat, (String)SALESFORCE_HOUR_FORMAT);
        return column + " " + operator + " " + Formattedvalue;
    }

    public String getDatePredicateCondition(String column, long value, String valueFormat, String operator) {
        log.info("Getting date predicate from salesforce");
        String Formattedvalue = Utils.toDateTimeFormat((String)Long.toString(value), (String)valueFormat, (String)SALESFORCE_DATE_FORMAT);
        return column + " " + operator + " " + Formattedvalue;
    }

    public String getTimestampPredicateCondition(String column, long value, String valueFormat, String operator) {
        log.info("Getting timestamp predicate from salesforce");
        String Formattedvalue = Utils.toDateTimeFormat((String)Long.toString(value), (String)valueFormat, (String)SALESFORCE_TIMESTAMP_FORMAT);
        return column + " " + operator + " " + Formattedvalue;
    }

    public Map<String, String> getDataTypeMap() {
        ImmutableMap dataTypeMap = ImmutableMap.builder().put((Object)"url", (Object)"string").put((Object)"textarea", (Object)"string").put((Object)"reference", (Object)"string").put((Object)"phone", (Object)"string").put((Object)"masterrecord", (Object)"string").put((Object)"location", (Object)"string").put((Object)"id", (Object)"string").put((Object)"encryptedstring", (Object)"string").put((Object)"email", (Object)"string").put((Object)"DataCategoryGroupReference", (Object)"string").put((Object)"calculated", (Object)"string").put((Object)"anyType", (Object)"string").put((Object)"address", (Object)"string").put((Object)"blob", (Object)"string").put((Object)"date", (Object)"date").put((Object)"datetime", (Object)"timestamp").put((Object)"time", (Object)"time").put((Object)"object", (Object)"string").put((Object)"string", (Object)"string").put((Object)"int", (Object)"int").put((Object)"long", (Object)"long").put((Object)"double", (Object)"double").put((Object)"percent", (Object)"double").put((Object)"currency", (Object)"double").put((Object)"decimal", (Object)"double").put((Object)"boolean", (Object)"boolean").put((Object)"picklist", (Object)"string").put((Object)"multipicklist", (Object)"string").put((Object)"combobox", (Object)"string").put((Object)"list", (Object)"string").put((Object)"set", (Object)"string").put((Object)"map", (Object)"string").put((Object)"enum", (Object)"string").build();
        return dataTypeMap;
    }

    public Iterator<JsonElement> getRecordSetFromSourceApi(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList) throws IOException {
        log.debug("Getting salesforce data using bulk api");
        RecordSet<JsonElement> rs = null;
        try {
            if (this.bulkApiInitialRun) {
                this.setBulkJobFinished(false);
                this.bulkResultIdList = this.getQueryResultIds(entity, predicateList);
                log.info("Number of bulk api resultSet Ids:" + this.bulkResultIdList.size());
            }
            while (!this.isBulkJobFinished() && (rs == null || rs.isEmpty())) {
                rs = this.getBulkData();
            }
            this.bulkApiInitialRun = false;
            boolean isSoftDeletesPullDisabled = Boolean.valueOf(this.workUnit.getProp("source.querybased.salesforce.is.soft.deletes.pull.disabled"));
            if (rs == null || rs.isEmpty()) {
                if (this.columnList.contains("IsDeleted") && !isSoftDeletesPullDisabled) {
                    return this.getSoftDeletedRecords(schema, entity, workUnit, predicateList);
                }
                log.info("Ignoring soft delete records");
            }
            return rs.iterator();
        }
        catch (Exception e) {
            throw new IOException("Failed to get records using bulk api; error - " + e.getMessage(), e);
        }
    }

    private Iterator<JsonElement> getSoftDeletedRecords(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList) throws DataRecordException {
        return this.getRecordSet(schema, entity, workUnit, predicateList);
    }

    public boolean bulkApiLogin() throws Exception {
        log.info("Authenticating salesforce bulk api");
        boolean success = false;
        String hostName = this.workUnitState.getProp("source.conn.host");
        String apiVersion = this.workUnitState.getProp("source.conn.version");
        if (Strings.isNullOrEmpty((String)apiVersion)) {
            apiVersion = this.bulkApiUseQueryAll ? "42.0" : "29.0";
        }
        String soapAuthEndPoint = hostName + SALESFORCE_SOAP_SERVICE + "/" + apiVersion;
        try {
            boolean isConnectSuccess;
            String accessToken;
            ConnectorConfig partnerConfig = new ConnectorConfig();
            if (this.workUnitState.contains("source.conn.use.proxy.url") && !this.workUnitState.getProp("source.conn.use.proxy.url").isEmpty()) {
                partnerConfig.setProxy(this.workUnitState.getProp("source.conn.use.proxy.url"), this.workUnitState.getPropAsInt("source.conn.use.proxy.port"));
            }
            if ((accessToken = this.sfConnector.getAccessToken()) == null && (isConnectSuccess = this.sfConnector.connect())) {
                accessToken = this.sfConnector.getAccessToken();
            }
            if (accessToken != null) {
                String serviceEndpoint = this.sfConnector.getInstanceUrl() + SALESFORCE_SOAP_SERVICE + "/" + apiVersion;
                partnerConfig.setSessionId(accessToken);
                partnerConfig.setServiceEndpoint(serviceEndpoint);
            } else {
                String securityToken = this.workUnitState.getProp("source.conn.security.token");
                String password = PasswordManager.getInstance((State)this.workUnitState).readPassword(this.workUnitState.getProp("source.conn.password"));
                partnerConfig.setUsername(this.workUnitState.getProp("source.conn.username"));
                partnerConfig.setPassword(password + securityToken);
            }
            partnerConfig.setAuthEndpoint(soapAuthEndPoint);
            new PartnerConnection(partnerConfig);
            String soapEndpoint = partnerConfig.getServiceEndpoint();
            String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/")) + "async/" + apiVersion;
            ConnectorConfig config = new ConnectorConfig();
            config.setSessionId(partnerConfig.getSessionId());
            config.setRestEndpoint(restEndpoint);
            config.setCompression(true);
            config.setTraceFile("traceLogs.txt");
            config.setTraceMessage(false);
            config.setPrettyPrintXml(true);
            if (this.workUnitState.contains("source.conn.use.proxy.url") && !this.workUnitState.getProp("source.conn.use.proxy.url").isEmpty()) {
                config.setProxy(this.workUnitState.getProp("source.conn.use.proxy.url"), this.workUnitState.getPropAsInt("source.conn.use.proxy.port"));
            }
            this.bulkConnection = new BulkConnection(config);
            success = true;
        }
        catch (RuntimeException e) {
            throw new RuntimeException("Failed to connect to salesforce bulk api; error - " + e, e);
        }
        return success;
    }

    private List<BatchIdAndResultId> getQueryResultIds(String entity, List<Predicate> predicateList) throws Exception {
        if (!this.bulkApiLogin()) {
            throw new IllegalArgumentException("Invalid Login");
        }
        try {
            boolean usingPkChunking = false;
            this.bulkJob.setObject(entity);
            this.bulkJob.setOperation(this.bulkApiUseQueryAll ? OperationEnum.queryAll : OperationEnum.query);
            this.bulkJob.setConcurrencyMode(ConcurrencyMode.Parallel);
            if (this.pkChunking && (this.pkChunkingSkipCountCheck || this.getExpectedRecordCount() > (long)this.pkChunkingSize)) {
                log.info("Enabling pk chunking with size {}", (Object)this.pkChunkingSize);
                this.bulkConnection.addHeader("Sforce-Enable-PKChunking", "chunkSize=" + this.pkChunkingSize);
                usingPkChunking = true;
            }
            this.bulkJob.setContentType(ContentType.CSV);
            this.bulkJob = this.bulkConnection.createJob(this.bulkJob);
            this.bulkJob = this.bulkConnection.getJobStatus(this.bulkJob.getId());
            String query = this.updatedQuery;
            if (!SalesforceExtractor.isNullPredicate(predicateList)) {
                String limitString = SalesforceExtractor.getLimitFromInputQuery(query);
                query = query.replace(limitString, "");
                ListIterator<Predicate> i = predicateList.listIterator();
                while (i.hasNext()) {
                    Predicate predicate = (Predicate)i.next();
                    query = SqlQueryUtils.addPredicate((String)query, (String)predicate.getCondition());
                }
                query = query + limitString;
            }
            log.info("QUERY:" + query);
            ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes(ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
            BatchInfo bulkBatchInfo = this.bulkConnection.createBatchFromStream(this.bulkJob, (InputStream)bout);
            long expectedSizePerBatch = usingPkChunking ? (long)this.pkChunkingSize : this.getExpectedRecordCount();
            int retryInterval = Math.min(600, 30 + (int)Math.ceil((float)expectedSizePerBatch / 10000.0f) * 2);
            log.info("Salesforce bulk api retry interval in seconds:" + retryInterval);
            bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bulkBatchInfo.getId());
            while (!(bulkBatchInfo.getState() == BatchStateEnum.Completed || bulkBatchInfo.getState() == BatchStateEnum.Failed || usingPkChunking && bulkBatchInfo.getState() == BatchStateEnum.NotProcessed)) {
                Thread.sleep(retryInterval * 1000);
                bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bulkBatchInfo.getId());
                log.debug("Bulk Api Batch Info:" + bulkBatchInfo);
                log.info("Waiting for bulk resultSetIds");
            }
            BatchInfoList batchInfoList = this.bulkConnection.getBatchInfoList(this.bulkJob.getId());
            if (usingPkChunking && bulkBatchInfo.getState() == BatchStateEnum.NotProcessed) {
                bulkBatchInfo = this.waitForPkBatches(batchInfoList, retryInterval);
            }
            if (bulkBatchInfo.getState() == BatchStateEnum.Failed) {
                log.error("Bulk batch failed: " + bulkBatchInfo.toString());
                throw new RuntimeException("Failed to get bulk batch info for jobId " + bulkBatchInfo.getJobId() + " error - " + bulkBatchInfo.getStateMessage());
            }
            ArrayList batchIdAndResultIdList = Lists.newArrayList();
            for (BatchInfo bi : batchInfoList.getBatchInfo()) {
                QueryResultList list = this.bulkConnection.getQueryResultList(this.bulkJob.getId(), bi.getId());
                for (String result : list.getResult()) {
                    batchIdAndResultIdList.add(new BatchIdAndResultId(bi.getId(), result));
                }
            }
            log.info("QueryResultList: " + batchIdAndResultIdList);
            return batchIdAndResultIdList;
        }
        catch (AsyncApiException | InterruptedException | RuntimeException e) {
            throw new RuntimeException("Failed to get query result ids from salesforce using bulk api; error - " + e.getMessage(), e);
        }
    }

    private BufferedReader getBulkBufferedReader(int index) throws AsyncApiException {
        return new BufferedReader(new InputStreamReader(this.bulkConnection.getQueryResultStream(this.bulkJob.getId(), this.bulkResultIdList.get(index).getBatchId(), this.bulkResultIdList.get(index).getResultId()), ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
    }

    private void fetchResultBatch(RecordSetList<JsonElement> rs, int initialRecordCount) throws DataRecordException, IOException {
        int recordCount = initialRecordCount;
        InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader);
        if (this.isNewBulkResultSet()) {
            this.bulkRecordHeader = reader.nextRecord();
            this.bulkResultColumCount = this.bulkRecordHeader.size();
            this.setNewBulkResultSet(false);
        }
        while ((this.csvRecord = reader.nextRecord()) != null) {
            JsonObject jsonObject = Utils.csvToJsonObject(this.bulkRecordHeader, this.csvRecord, (int)this.bulkResultColumCount);
            rs.add((Object)jsonObject);
            ++this.bulkRecordCount;
            if (++recordCount < this.batchSize) continue;
            log.info("Total number of records processed so far: " + this.bulkRecordCount);
            break;
        }
    }

    private void reinitializeBufferedReader() throws IOException, AsyncApiException {
        this.bulkBufferedReader.close();
        this.bulkBufferedReader = this.getBulkBufferedReader(this.bulkResultIdCount - 1);
        if (!this.isNewBulkResultSet()) {
            ArrayList lastCsvRecord = null;
            InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader);
            reader.nextRecord();
            int recordsToSkip = this.bulkRecordCount - this.prevBulkRecordCount;
            log.info("Skipping {} records on retry: ", (Object)recordsToSkip);
            for (int i = 0; i < recordsToSkip; ++i) {
                lastCsvRecord = reader.nextRecord();
            }
            if (recordsToSkip > 0 && !this.csvRecord.equals(lastCsvRecord)) {
                throw new RuntimeException("Repositioning after reconnecting did not point to the expected record");
            }
        }
    }

    private void fetchResultBatchWithRetry(RecordSetList<JsonElement> rs) throws AsyncApiException, DataRecordException, IOException {
        boolean success = false;
        int retryCount = 0;
        int recordCountBeforeFetch = this.bulkRecordCount;
        do {
            try {
                if (retryCount > 0) {
                    this.reinitializeBufferedReader();
                }
                this.fetchResultBatch(rs, this.bulkRecordCount - recordCountBeforeFetch);
                success = true;
            }
            catch (IOException e) {
                if (retryCount < this.fetchRetryLimit) {
                    log.info("Exception while fetching data, retrying: " + e.getMessage(), (Throwable)e);
                    ++retryCount;
                    continue;
                }
                log.error("Exception while fetching data: " + e.getMessage(), (Throwable)e);
                throw e;
            }
        } while (!success);
    }

    private RecordSet<JsonElement> getBulkData() throws DataRecordException {
        log.debug("Processing bulk api batch...");
        RecordSetList rs = new RecordSetList();
        try {
            if (this.bulkBufferedReader == null || !this.bulkBufferedReader.ready()) {
                if (this.bulkResultIdCount > 0) {
                    log.info("Result set {} had {} records", (Object)this.bulkResultIdCount, (Object)(this.bulkRecordCount - this.prevBulkRecordCount));
                }
                if (this.bulkResultIdCount < this.bulkResultIdList.size()) {
                    log.info("Stream resultset for resultId:" + this.bulkResultIdList.get(this.bulkResultIdCount));
                    this.setNewBulkResultSet(true);
                    if (this.bulkBufferedReader != null) {
                        this.bulkBufferedReader.close();
                    }
                    this.bulkBufferedReader = this.getBulkBufferedReader(this.bulkResultIdCount);
                    ++this.bulkResultIdCount;
                    this.prevBulkRecordCount = this.bulkRecordCount;
                } else {
                    log.info("Bulk job is finished");
                    this.setBulkJobFinished(true);
                    return rs;
                }
            }
            this.fetchResultBatchWithRetry((RecordSetList<JsonElement>)rs);
        }
        catch (Exception e) {
            throw new DataRecordException("Failed to get records from salesforce; error - " + e.getMessage(), e);
        }
        return rs;
    }

    public void closeConnection() throws Exception {
        if (this.bulkConnection != null && !this.bulkConnection.getJobStatus(this.bulkJob.getId()).getState().toString().equals("Closed")) {
            log.info("Closing salesforce bulk job connection");
            this.bulkConnection.closeJob(this.bulkJob.getId());
        }
    }

    public static List<Command> constructGetCommand(String restQuery) {
        return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery), (CommandType)RestApiCommand.RestApiCommandType.GET));
    }

    private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int retryInterval) throws InterruptedException, AsyncApiException {
        BatchInfo batchInfo = null;
        BatchInfo[] batchInfos = batchInfoList.getBatchInfo();
        for (int i = 1; i < batchInfos.length; ++i) {
            BatchInfo bi = batchInfos[i];
            bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId());
            while (bi.getState() != BatchStateEnum.Completed && bi.getState() != BatchStateEnum.Failed) {
                Thread.sleep(retryInterval * 1000);
                bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId());
                log.debug("Bulk Api Batch Info:" + bi);
                log.info("Waiting for bulk resultSetIds");
            }
            batchInfo = bi;
            if (batchInfo.getState() == BatchStateEnum.Failed) break;
        }
        return batchInfo;
    }

    private static class BatchIdAndResultId {
        private final String batchId;
        private final String resultId;

        @ConstructorProperties(value={"batchId", "resultId"})
        public BatchIdAndResultId(String batchId, String resultId) {
            this.batchId = batchId;
            this.resultId = resultId;
        }

        public String getBatchId() {
            return this.batchId;
        }

        public String getResultId() {
            return this.resultId;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof BatchIdAndResultId)) {
                return false;
            }
            BatchIdAndResultId other = (BatchIdAndResultId)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$batchId = this.getBatchId();
            String other$batchId = other.getBatchId();
            if (this$batchId == null ? other$batchId != null : !this$batchId.equals(other$batchId)) {
                return false;
            }
            String this$resultId = this.getResultId();
            String other$resultId = other.getResultId();
            return !(this$resultId == null ? other$resultId != null : !this$resultId.equals(other$resultId));
        }

        protected boolean canEqual(Object other) {
            return other instanceof BatchIdAndResultId;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $batchId = this.getBatchId();
            result = result * 59 + ($batchId == null ? 43 : $batchId.hashCode());
            String $resultId = this.getResultId();
            result = result * 59 + ($resultId == null ? 43 : $resultId.hashCode());
            return result;
        }

        public String toString() {
            return "SalesforceExtractor.BatchIdAndResultId(batchId=" + this.getBatchId() + ", resultId=" + this.getResultId() + ")";
        }
    }
}

