package com.ge.research.semtk.sparqlX.dispatch;

import com.ge.research.semtk.belmont.AutoGeneratedQueryTypes;
import com.ge.research.semtk.belmont.NodeGroup;
import com.ge.research.semtk.belmont.Returnable;
import com.ge.research.semtk.edc.JobTracker;
import com.ge.research.semtk.edc.client.OntologyInfoClient;
import com.ge.research.semtk.edc.client.ResultsClient;
import com.ge.research.semtk.edc.client.ResultsClientConfig;
import com.ge.research.semtk.load.utility.SparqlGraphJson;
import com.ge.research.semtk.nodeGroupStore.client.NodeGroupStoreRestClient;
import com.ge.research.semtk.querygen.Query;
import com.ge.research.semtk.querygen.QueryList;
import com.ge.research.semtk.querygen.client.QueryExecuteClient;
import com.ge.research.semtk.querygen.client.QueryGenClient;
import com.ge.research.semtk.resultSet.Table;
import com.ge.research.semtk.resultSet.TableResultSet;
import com.ge.research.semtk.sparqlX.SparqlEndpointInterface;
import com.ge.research.semtk.sparqlX.SparqlResultTypes;
import com.ge.research.semtk.sparqlX.asynchronousQuery.AsynchronousNodeGroupBasedQueryDispatcher;
import com.ge.research.semtk.sparqlX.asynchronousQuery.DispatcherSupportedQueryTypes;
import com.ge.research.semtk.sparqlX.dispatch.QueryGroup.DispatchQueryGroup;
import com.ge.research.semtk.sparqlX.dispatch.QueryGroup.QueryGroupCollection;
import com.ge.research.semtk.utility.LocalLogger;
import com.ge.research.semtk.utilityge.Utility;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import org.apache.jena.atlas.json.io.JSWriter;
import org.apache.zookeeper.server.quorum.QuorumStats;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;

/* loaded from: input_file:BOOT-INF/lib/sparqlGraphLibrary-2.2.2.jar:com/ge/research/semtk/sparqlX/dispatch/EdcDispatcher.class */
public class EdcDispatcher extends AsynchronousNodeGroupBasedQueryDispatcher {
    protected static final int MAX_NUMBER_SIMULTANEOUS_QUERIES_PER_USER = 50;
    DispatchServiceManager dispatchServiceMgr;
    QueryGroupCollection queryGroupColl;
    JSONObject constraints;
    public static final String[] EDC_DISPATCHER_COL_NAMES = {Utility.COL_NAME_UUID, Utility.COL_NAME_QUERY, Utility.COL_NAME_CONFIGJSON};
    public static final String[] EDC_DISPATCHER_COL_TYPES = {"String", "String", "String"};

    public EdcDispatcher(String str, SparqlGraphJson sparqlGraphJson, SparqlEndpointInterface sparqlEndpointInterface, ResultsClientConfig resultsClientConfig, SparqlEndpointInterface sparqlEndpointInterface2, boolean z, OntologyInfoClient ontologyInfoClient, NodeGroupStoreRestClient nodeGroupStoreRestClient) throws Exception {
        super(str, sparqlGraphJson, sparqlEndpointInterface, resultsClientConfig, sparqlEndpointInterface2, false, ontologyInfoClient, nodeGroupStoreRestClient);
        this.constraints = null;
        this.dispatchServiceMgr = new DispatchServiceManager(sparqlEndpointInterface2, this.queryNodeGroup, this.oInfo, this.domain, this.querySei, ontologyInfoClient, z);
    }

    @Override // com.ge.research.semtk.sparqlX.asynchronousQuery.AsynchronousNodeGroupBasedQueryDispatcher
    public void execute(Object obj, Object obj2, DispatcherSupportedQueryTypes dispatcherSupportedQueryTypes, String str) {
        try {
            JSONObject jSONObject = (JSONObject) obj;
            QueryFlags queryFlags = (QueryFlags) obj2;
            LocalLogger.logToStdErr("Job " + this.jobID + ": dispatcher start");
            if (this.dispatchServiceMgr.getServiceMnemonic() == null) {
                executePlainSparqlQuery(getSparqlQuery(dispatcherSupportedQueryTypes, str), dispatcherSupportedQueryTypes);
            } else if (dispatcherSupportedQueryTypes.equals(DispatcherSupportedQueryTypes.SELECT_DISTINCT)) {
                executeEdcSelect(jSONObject, queryFlags);
            } else {
                if (!dispatcherSupportedQueryTypes.equals(DispatcherSupportedQueryTypes.FILTERCONSTRAINT)) {
                    throw new Exception("the query type " + dispatcherSupportedQueryTypes.name() + " is not supported for EDC queries.");
                }
                executeEdcFilterConstraint(str);
            }
            LocalLogger.logToStdErr("Job " + this.jobID + ": dispatcher end");
        } catch (Exception e) {
            updateStatusToFailed(e.getMessage());
            LocalLogger.printStackTrace(e);
        }
    }

    private TableResultSet executeEdcFilterConstraint(String str) throws Exception {
        NodeGroup filterNodegroup = this.dispatchServiceMgr.getFilterNodegroup();
        Returnable itemBySparqlID = filterNodegroup.getItemBySparqlID(str);
        if (itemBySparqlID == null) {
            throw new Exception("Can't find item in nodegroup with SPARQL id: " + str);
        }
        executePlainSparqlQuery(filterNodegroup.generateSparql(AutoGeneratedQueryTypes.QUERY_CONSTRAINT, false, -1, itemBySparqlID), DispatcherSupportedQueryTypes.SELECT_DISTINCT);
        return null;
    }

    private TableResultSet executeEdcSelect(JSONObject jSONObject, QueryFlags queryFlags) throws Exception {
        TableResultSet tableResultSet;
        try {
            LocalLogger.logToStdErr("Job " + this.jobID + ": dispatch service manager start");
            NodeGroup edcNodegroup = this.dispatchServiceMgr.getEdcNodegroup();
            QueryGenClient generateClient = this.dispatchServiceMgr.getGenerateClient();
            LocalLogger.logToStdErr("Job " + this.jobID + ": dispatcher prep binning start");
            Table performEdcPrepBinning = performEdcPrepBinning(edcNodegroup);
            if (performEdcPrepBinning.getNumRows() == 0) {
                tableResultSet = new TableResultSet((Boolean) true);
                tableResultSet.addResults(new Table(new String[0], new String[0]));
            } else {
                updateStatus(10);
                LocalLogger.logToStdErr("Job " + this.jobID + ": generate external queries start");
                TableResultSet execute = generateClient.execute(performEdcPrepBinning, jSONObject, queryFlags);
                if (!execute.getSuccess()) {
                    throw new Exception("Could not generate queries: " + execute.getRationaleAsString(","));
                }
                Table results = execute.getResults();
                LocalLogger.logToStdErr("Job " + this.jobID + ": generate external queries end");
                LocalLogger.logToStdOut(results.toCSVString());
                tableResultSet = new TableResultSet((Boolean) true);
                if (queryFlags == null || !queryFlags.isSet(AsynchronousNodeGroupBasedQueryDispatcher.FLAG_DISPATCH_RETURN_QUERIES)) {
                    updateStatus(20);
                    LocalLogger.logToStdErr("Job " + this.jobID + ": execute external queries start");
                    runEdcThreads(results, 20, 70);
                    updateStatus(75);
                    LocalLogger.logToStdErr("Job " + this.jobID + ": fuse results start");
                    String[] columnNames = this.queryGroupColl.getColumnNames();
                    tableResultSet.addResults(fuseResults(columnNames, getColumnTypes(columnNames)));
                } else {
                    updateStatus(60);
                    Table table = execute.getTable();
                    table.removeColumn(Utility.COL_NAME_UUID);
                    table.insertColumn("EdcMnemonic", "String", 0, this.dispatchServiceMgr.getServiceMnemonic());
                    tableResultSet.addResults(table);
                }
            }
            updateStatus(95);
            LocalLogger.logToStdErr("Job " + this.jobID + ": write results start");
            sendResultsToService(tableResultSet);
            LocalLogger.logToStdErr("operations completed");
            updateStatus(100);
            return tableResultSet;
        } catch (Exception e) {
            updateStatusToFailed(e.getMessage());
            throw e;
        }
    }

    private int runEdcThreads(Table table, int i, int i2) throws Exception {
        if (table.getNumRows() == 0) {
            throw new Exception("Query generator indicates no work to perform");
        }
        float numRows = 50 / table.getNumRows();
        if (numRows < 1.0f) {
            numRows = 1.0f;
        }
        int i3 = 0;
        String str = QuorumStats.Provider.UNKNOWN_STATE;
        Exception[] excArr = new Exception[table.getNumRows()];
        LocalLogger.logToStdErr("About to start edc work");
        int numRows2 = table.getNumRows();
        DispatcherWorkThread[] dispatcherWorkThreadArr = new DispatcherWorkThread[numRows2];
        int i4 = 0;
        int i5 = 0;
        int i6 = 0;
        int i7 = 0;
        while (i4 < numRows2 && i6 != numRows2) {
            if (i6 >= 50 || i7 >= numRows2) {
                for (int i8 = 0; i8 < 50; i8++) {
                    try {
                        dispatcherWorkThreadArr[i8 + (50 * i5)].join();
                        i4++;
                        i6--;
                        this.jobTracker.incrementPercentComplete(this.jobID, Math.round(numRows), i2);
                        if (i4 >= numRows2) {
                            break;
                        }
                    } catch (Exception e) {
                        LocalLogger.printStackTrace(e);
                        throw new IOException("(Join failure in dispatch threading : joined value was " + i8 + ":: " + e.getClass().toString() + JSWriter.ObjectPairSep + e.getMessage() + ")");
                    }
                }
                if (i7 % 50 == 0) {
                    i5++;
                }
            } else if (i7 < numRows2) {
                try {
                    try {
                        DispatchQueryGroup groupByUUID = this.queryGroupColl.getGroupByUUID(UUID.fromString(table.getCell(i7, Utility.COL_NAME_UUID)));
                        JSONObject jSONObject = (JSONObject) new JSONParser().parse(table.getCell(i7, Utility.COL_NAME_CONFIGJSON));
                        String str2 = getJobId() + "_edc_" + String.valueOf(i7);
                        LocalLogger.logToStdErr("config for current qry:");
                        LocalLogger.logToStdErr(jSONObject.toString() + " jobId: " + str2);
                        QueryExecuteClient executeClient = this.dispatchServiceMgr.getExecuteClient(jSONObject, str2);
                        ResultsClient resultsClient = new ResultsClient(this.resConfig);
                        str = executeClient.getClass().getSimpleName();
                        dispatcherWorkThreadArr[i7] = new DispatcherWorkThread(groupByUUID, table.getColumn(Utility.COL_NAME_QUERY)[i7], executeClient, new JobTracker(this.jobTrackerSei), resultsClient, excArr, i7);
                        LocalLogger.logToStdErr("Starting EDC thread " + i7 + " of a total of " + numRows2);
                        dispatcherWorkThreadArr[i7].start();
                        i6++;
                        i7++;
                    } catch (Exception e2) {
                        throw new Exception("spin up of query thread failed. reported:" + e2.getMessage());
                    }
                } catch (Exception e3) {
                    throw new Exception("Dispatcher threading failure: " + e3.getMessage());
                }
            } else {
                continue;
            }
        }
        for (int i9 = 0; i9 < i6; i9++) {
            try {
                dispatcherWorkThreadArr[i9 + (i5 * 50)].join();
                this.jobTracker.incrementPercentComplete(this.jobID, Math.round(numRows), i2);
            } catch (Exception e4) {
                LocalLogger.printStackTrace(e4);
                throw new Exception("(Join failure in dispatch threading : joined value was " + i9 + ":: " + e4.getClass().toString() + JSWriter.ObjectPairSep + e4.getMessage() + ")");
            }
        }
        for (Exception exc : excArr) {
            if (exc != null) {
                String message = exc.getMessage();
                if (message != null && message.length() > 500) {
                    message = message.substring(0, 500) + "...";
                }
                throw new Exception(str + " could not retrieve data: " + message, exc);
            }
            i3++;
        }
        return i3;
    }

    private String[] getColumnTypes(String[] strArr) {
        String[] strArr2 = new String[strArr.length];
        for (int i = 0; i < strArr.length; i++) {
            strArr2[i] = this.queryGroupColl.getColumnType(strArr[i]);
        }
        return strArr2;
    }

    private Table performEdcPrepBinning(NodeGroup nodeGroup) throws Exception {
        TableResultSet tableResultSet = (TableResultSet) this.dispatchServiceMgr.getNodegroupSei().executeQueryAndBuildResultSet(nodeGroup.generateSparql(AutoGeneratedQueryTypes.QUERY_DISTINCT, false, null, null, false), SparqlResultTypes.TABLE);
        String[] addedSparqlIds = this.dispatchServiceMgr.getAddedSparqlIds();
        Table table = tableResultSet.getTable();
        ArrayList arrayList = new ArrayList();
        for (String str : table.getColumnNames()) {
            Boolean bool = false;
            int length = addedSparqlIds.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                if (addedSparqlIds[i].equals(str)) {
                    bool = true;
                    break;
                }
                i++;
            }
            if (!bool.booleanValue()) {
                arrayList.add(str);
            }
        }
        this.queryGroupColl = new QueryGroupCollection(table, addedSparqlIds, arrayList);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            String str2 = (String) it.next();
            this.queryGroupColl.setSemanticColumnType(str2, table.getColumnType(str2));
        }
        return this.queryGroupColl.returnDispatchQueryGroupTable();
    }

    private ArrayList<ServiceEndpointInfo> getEdcEndpoints() {
        return new ArrayList<>();
    }

    private Table fuseResults(String[] strArr, String[] strArr2) throws Exception {
        return this.queryGroupColl.returnFusedResults_parallel(strArr, strArr2);
    }

    @Override // com.ge.research.semtk.sparqlX.asynchronousQuery.AsynchronousNodeGroupBasedQueryDispatcher
    public String getConstraintType() throws Exception {
        return this.dispatchServiceMgr.getConstraintType();
    }

    @Override // com.ge.research.semtk.sparqlX.asynchronousQuery.AsynchronousNodeGroupBasedQueryDispatcher
    public String[] getConstraintVariableNames() throws Exception {
        return this.dispatchServiceMgr.getConstraintVariableNames();
    }

    public static Table getTableForDispatcher(HashMap<UUID, Object> hashMap) throws Exception {
        Table table = new Table(EDC_DISPATCHER_COL_NAMES, EDC_DISPATCHER_COL_TYPES, (ArrayList<ArrayList<String>>) null);
        for (UUID uuid : (UUID[]) hashMap.keySet().toArray(new UUID[hashMap.keySet().size()])) {
            QueryList queryList = (QueryList) hashMap.get(uuid);
            Iterator<Query> it = queryList.getQueries().iterator();
            while (it.hasNext()) {
                Query next = it.next();
                ArrayList<String> arrayList = new ArrayList<>();
                arrayList.add(uuid.toString());
                arrayList.add(next.getQuery());
                arrayList.add(queryList.getConfig().toString());
                table.addRow(arrayList);
            }
        }
        return table;
    }
}
