package com.hazelcast.client.impl.protocol.task.mapreduce;

import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.task.AbstractMessageTask;
import com.hazelcast.cluster.memberselector.MemberSelectors;
import com.hazelcast.config.JobTrackerConfig;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.Member;
import com.hazelcast.instance.Node;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.mapreduce.CombinerFactory;
import com.hazelcast.mapreduce.JobTracker;
import com.hazelcast.mapreduce.KeyPredicate;
import com.hazelcast.mapreduce.KeyValueSource;
import com.hazelcast.mapreduce.Mapper;
import com.hazelcast.mapreduce.ReducerFactory;
import com.hazelcast.mapreduce.TopologyChangedStrategy;
import com.hazelcast.mapreduce.impl.AbstractJobTracker;
import com.hazelcast.mapreduce.impl.MapReduceService;
import com.hazelcast.mapreduce.impl.MapReduceUtil;
import com.hazelcast.mapreduce.impl.operation.KeyValueJobOperation;
import com.hazelcast.mapreduce.impl.operation.StartProcessingJobOperation;
import com.hazelcast.mapreduce.impl.task.TrackableJobFuture;
import com.hazelcast.nio.Connection;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.util.StringUtil;
import java.security.Permission;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;

/* loaded from: input_file:WEB-INF/lib/hazelcast-3.7.4-atlassian-43.jar:com/hazelcast/client/impl/protocol/task/mapreduce/AbstractMapReduceTask.class */
public abstract class AbstractMapReduceTask<Parameters> extends AbstractMessageTask<Parameters> implements ExecutionCallback {
    public AbstractMapReduceTask(ClientMessage clientMessage, Node node, Connection connection) {
        super(clientMessage, node, connection);
    }

    @Override // com.hazelcast.client.impl.protocol.task.AbstractMessageTask
    protected void processMessage() {
        MapReduceService mapReduceService = (MapReduceService) getService(MapReduceService.SERVICE_NAME);
        NodeEngine nodeEngine = mapReduceService.getNodeEngine();
        if (nodeEngine.getClusterService().getSize(MemberSelectors.DATA_MEMBER_SELECTOR) == 0) {
            throw new IllegalStateException("Could not register map reduce job since there are no nodes owning a partition");
        }
        String distributedObjectName = getDistributedObjectName();
        AbstractJobTracker abstractJobTracker = (AbstractJobTracker) mapReduceService.createDistributedObject(distributedObjectName);
        TrackableJobFuture trackableJobFuture = new TrackableJobFuture(distributedObjectName, getJobId(), abstractJobTracker, nodeEngine, null);
        if (abstractJobTracker.registerTrackableJob(trackableJobFuture)) {
            startSupervisionTask(abstractJobTracker);
            trackableJobFuture.andThen(this);
        }
    }

    protected abstract String getJobId();

    protected abstract int getChunkSize();

    protected abstract String getTopologyChangedStrategy();

    protected abstract KeyValueSource getKeyValueSource();

    protected abstract Mapper getMapper();

    protected abstract CombinerFactory getCombinerFactory();

    protected abstract ReducerFactory getReducerFactory();

    protected abstract Collection getKeys();

    protected abstract KeyPredicate getPredicate();

    private void startSupervisionTask(JobTracker jobTracker) {
        MapReduceService mapReduceService = (MapReduceService) getService(MapReduceService.SERVICE_NAME);
        JobTrackerConfig jobTrackerConfig = ((AbstractJobTracker) jobTracker).getJobTrackerConfig();
        boolean isCommunicateStats = jobTrackerConfig.isCommunicateStats();
        int chunkSizeOrConfigChunkSize = getChunkSizeOrConfigChunkSize(jobTrackerConfig);
        TopologyChangedStrategy topologyChangedStrategyOrConfigTopologyChangedStrategy = getTopologyChangedStrategyOrConfigTopologyChangedStrategy(jobTrackerConfig);
        String distributedObjectName = getDistributedObjectName();
        String jobId = getJobId();
        KeyValueSource keyValueSource = getKeyValueSource();
        Mapper mapper = getMapper();
        CombinerFactory combinerFactory = getCombinerFactory();
        ReducerFactory reducerFactory = getReducerFactory();
        Collection<Object> keyObjects = getKeyObjects(getKeys());
        KeyPredicate predicate = getPredicate();
        ClusterService clusterService = this.nodeEngine.getClusterService();
        Iterator<Member> it = clusterService.getMembers(KeyValueJobOperation.MEMBER_SELECTOR).iterator();
        while (it.hasNext()) {
            MapReduceUtil.executeOperation(new KeyValueJobOperation(distributedObjectName, jobId, chunkSizeOrConfigChunkSize, keyValueSource, mapper, combinerFactory, reducerFactory, isCommunicateStats, topologyChangedStrategyOrConfigTopologyChangedStrategy), it.next().getAddress(), mapReduceService, this.nodeEngine);
        }
        Iterator<Member> it2 = clusterService.getMembers(MemberSelectors.DATA_MEMBER_SELECTOR).iterator();
        while (it2.hasNext()) {
            MapReduceUtil.executeOperation(new StartProcessingJobOperation(distributedObjectName, jobId, keyObjects, predicate), it2.next().getAddress(), mapReduceService, this.nodeEngine);
        }
    }

    private int getChunkSizeOrConfigChunkSize(JobTrackerConfig jobTrackerConfig) {
        int chunkSize = getChunkSize();
        if (chunkSize == -1) {
            chunkSize = jobTrackerConfig.getChunkSize();
        }
        return chunkSize;
    }

    private TopologyChangedStrategy getTopologyChangedStrategyOrConfigTopologyChangedStrategy(JobTrackerConfig jobTrackerConfig) {
        String topologyChangedStrategy = getTopologyChangedStrategy();
        return topologyChangedStrategy == null ? jobTrackerConfig.getTopologyChangedStrategy() : TopologyChangedStrategy.valueOf(topologyChangedStrategy.toUpperCase(StringUtil.LOCALE_INTERNAL));
    }

    private Collection<Object> getKeyObjects(Collection collection) {
        ArrayList arrayList = null;
        if (collection != null) {
            arrayList = new ArrayList(collection.size());
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                arrayList.add(this.serializationService.toObject(it.next()));
            }
        }
        return arrayList;
    }

    @Override // com.hazelcast.core.ExecutionCallback
    public void onResponse(Object obj) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : ((Map) obj).entrySet()) {
            arrayList.add(new AbstractMap.SimpleEntry(this.serializationService.toData(entry.getKey()), this.serializationService.toData(entry.getValue())));
        }
        sendResponse(arrayList);
    }

    @Override // com.hazelcast.core.ExecutionCallback
    public void onFailure(Throwable th) {
        handleProcessingFailure(th);
    }

    @Override // com.hazelcast.client.impl.protocol.task.AbstractMessageTask
    public String getServiceName() {
        return MapReduceService.SERVICE_NAME;
    }

    @Override // com.hazelcast.client.impl.client.SecureRequest
    public Permission getRequiredPermission() {
        return null;
    }

    @Override // com.hazelcast.client.impl.protocol.task.AbstractMessageTask, com.hazelcast.client.impl.client.SecureRequest
    public String getMethodName() {
        return null;
    }

    @Override // com.hazelcast.client.impl.protocol.task.AbstractMessageTask, com.hazelcast.client.impl.client.SecureRequest
    public Object[] getParameters() {
        return null;
    }
}
