/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.action;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ClientHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksService;

public class CloseJobAction
extends Action<Request, Response, RequestBuilder> {
    public static final CloseJobAction INSTANCE = new CloseJobAction();
    public static final String NAME = "cluster:admin/xpack/ml/job/close";

    private CloseJobAction() {
        super(NAME);
    }

    public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
        return new RequestBuilder(client, this);
    }

    public Response newResponse() {
        return new Response();
    }

    static void resolveAndValidateJobId(Request request, ClusterState state, List<String> openJobIds, List<String> closingJobIds) {
        PersistentTasksCustomMetaData tasksMetaData = (PersistentTasksCustomMetaData)state.getMetaData().custom("persistent_tasks");
        MlMetadata maybeNull = (MlMetadata)state.metaData().custom("ml");
        MlMetadata mlMetadata = maybeNull == null ? MlMetadata.EMPTY_METADATA : maybeNull;
        ArrayList failedJobs = new ArrayList();
        Consumer<String> jobIdProcessor = id -> {
            CloseJobAction.validateJobAndTaskState(id, mlMetadata, tasksMetaData);
            Job job = mlMetadata.getJobs().get(id);
            if (job.isDeleted()) {
                return;
            }
            CloseJobAction.addJobAccordingToState(id, tasksMetaData, openJobIds, closingJobIds, failedJobs);
        };
        Set<String> expandedJobIds = mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs());
        expandedJobIds.stream().forEach(jobIdProcessor::accept);
        if (!request.isForce() && failedJobs.size() > 0) {
            if (expandedJobIds.size() == 1) {
                throw org.elasticsearch.xpack.ml.utils.ExceptionsHelper.conflictStatusException("cannot close job [{}] because it failed, use force close", expandedJobIds.iterator().next());
            }
            throw org.elasticsearch.xpack.ml.utils.ExceptionsHelper.conflictStatusException("one or more jobs have state failed, use force close", new Object[0]);
        }
        openJobIds.addAll(failedJobs);
    }

    private static void addJobAccordingToState(String jobId, PersistentTasksCustomMetaData tasksMetaData, List<String> openJobs, List<String> closingJobs, List<String> failedJobs) {
        JobState jobState = MlMetadata.getJobState(jobId, tasksMetaData);
        switch (jobState) {
            case CLOSING: {
                closingJobs.add(jobId);
                break;
            }
            case FAILED: {
                failedJobs.add(jobId);
                break;
            }
            case OPENING: 
            case OPENED: {
                openJobs.add(jobId);
                break;
            }
        }
    }

    static TransportAction.WaitForCloseRequest buildWaitForCloseRequest(List<String> openJobIds, List<String> closingJobIds, PersistentTasksCustomMetaData tasks, Auditor auditor) {
        PersistentTasksCustomMetaData.PersistentTask<?> jobTask;
        TransportAction.WaitForCloseRequest waitForCloseRequest = new TransportAction.WaitForCloseRequest();
        for (String jobId : openJobIds) {
            jobTask = MlMetadata.getJobTask(jobId, tasks);
            if (jobTask == null) continue;
            auditor.info(jobId, "Job is closing");
            waitForCloseRequest.persistentTaskIds.add(jobTask.getId());
            waitForCloseRequest.jobsToFinalize.add(jobId);
        }
        for (String jobId : closingJobIds) {
            jobTask = MlMetadata.getJobTask(jobId, tasks);
            if (jobTask == null) continue;
            waitForCloseRequest.persistentTaskIds.add(jobTask.getId());
        }
        return waitForCloseRequest;
    }

    static void validateJobAndTaskState(String jobId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) {
        DatafeedState datafeedState;
        Job job = mlMetadata.getJobs().get(jobId);
        if (job == null) {
            throw new ResourceNotFoundException("cannot close job, because job [" + jobId + "] does not exist", new Object[0]);
        }
        Optional<DatafeedConfig> datafeed = mlMetadata.getDatafeedByJobId(jobId);
        if (datafeed.isPresent() && (datafeedState = MlMetadata.getDatafeedState(datafeed.get().getId(), tasks)) != DatafeedState.STOPPED) {
            throw org.elasticsearch.xpack.ml.utils.ExceptionsHelper.conflictStatusException("cannot close job [{}], datafeed hasn't been stopped", jobId);
        }
    }

    public static class TransportAction
    extends TransportTasksAction<OpenJobAction.JobTask, Request, Response, Response> {
        private final Client client;
        private final ClusterService clusterService;
        private final Auditor auditor;
        private final PersistentTasksService persistentTasksService;

        @Inject
        public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, Client client, Auditor auditor, PersistentTasksService persistentTasksService) {
            super(settings, CloseJobAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, Request::new, Response::new, "same");
            this.client = client;
            this.clusterService = clusterService;
            this.auditor = auditor;
            this.persistentTasksService = persistentTasksService;
        }

        protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
            ClusterState state = this.clusterService.state();
            DiscoveryNodes nodes = state.nodes();
            if (!request.local && !nodes.isLocalNodeElectedMaster()) {
                if (nodes.getMasterNode() == null) {
                    listener.onFailure((Exception)new MasterNotDiscoveredException("no known master node"));
                } else {
                    this.transportService.sendRequest(nodes.getMasterNode(), this.actionName, (TransportRequest)request, (TransportResponseHandler)new ActionListenerResponseHandler(listener, Response::new));
                }
            } else {
                ArrayList<String> openJobIds = new ArrayList<String>();
                ArrayList<String> closingJobIds = new ArrayList<String>();
                CloseJobAction.resolveAndValidateJobId(request, state, openJobIds, closingJobIds);
                request.setOpenJobIds(openJobIds.toArray(new String[0]));
                if (openJobIds.isEmpty() && closingJobIds.isEmpty()) {
                    listener.onResponse((Object)new Response(true));
                    return;
                }
                if (!request.isForce()) {
                    HashSet<String> executorNodes = new HashSet<String>();
                    PersistentTasksCustomMetaData tasks = (PersistentTasksCustomMetaData)state.metaData().custom("persistent_tasks");
                    for (String resolvedJobId : request.openJobIds) {
                        PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(resolvedJobId, tasks);
                        if (jobTask == null || !jobTask.isAssigned()) {
                            String message = "Cannot close job [" + resolvedJobId + "] because the job does not have an assigned node. Use force close to close the job";
                            listener.onFailure((Exception)org.elasticsearch.xpack.ml.utils.ExceptionsHelper.conflictStatusException(message, new Object[0]));
                            return;
                        }
                        executorNodes.add(jobTask.getExecutorNode());
                    }
                    request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
                }
                if (request.isForce()) {
                    ArrayList<String> jobIdsToForceClose = new ArrayList<String>(openJobIds);
                    jobIdsToForceClose.addAll(closingJobIds);
                    this.forceCloseJob(state, request, jobIdsToForceClose, listener);
                } else {
                    this.normalCloseJob(state, task, request, openJobIds, closingJobIds, listener);
                }
            }
        }

        protected void taskOperation(Request request, final OpenJobAction.JobTask jobTask, final ActionListener<Response> listener) {
            JobTaskStatus taskStatus = new JobTaskStatus(JobState.CLOSING, jobTask.getAllocationId());
            jobTask.updatePersistentStatus(taskStatus, ActionListener.wrap(task -> this.threadPool.executor("ml_utility").execute((Runnable)new AbstractRunnable(){

                public void onFailure(Exception e) {
                    listener.onFailure(e);
                }

                protected void doRun() throws Exception {
                    jobTask.closeJob("close job (api)");
                    listener.onResponse((Object)new Response(true));
                }
            }), arg_0 -> listener.onFailure(arg_0)));
        }

        protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures, List<FailedNodeException> failedNodeExceptions) {
            if (request.openJobIds.length != tasks.size()) {
                if (!taskOperationFailures.isEmpty()) {
                    throw ExceptionsHelper.convertToElastic((Exception)taskOperationFailures.get(0).getCause());
                }
                if (!failedNodeExceptions.isEmpty()) {
                    throw ExceptionsHelper.convertToElastic((Exception)((Exception)failedNodeExceptions.get(0)));
                }
                return new Response(true);
            }
            return new Response(tasks.stream().allMatch(Response::isClosed));
        }

        protected Response readTaskResponse(StreamInput in) throws IOException {
            return new Response(in);
        }

        private void forceCloseJob(ClusterState currentState, final Request request, List<String> jobIdsToForceClose, final ActionListener<Response> listener) {
            PersistentTasksCustomMetaData tasks = (PersistentTasksCustomMetaData)currentState.getMetaData().custom("persistent_tasks");
            final int numberOfJobs = jobIdsToForceClose.size();
            final AtomicInteger counter = new AtomicInteger();
            final AtomicArray failures = new AtomicArray(numberOfJobs);
            for (String jobId : jobIdsToForceClose) {
                PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
                if (jobTask == null) continue;
                this.auditor.info(jobId, "Job is closing (forced)");
                this.persistentTasksService.cancelPersistentTask(jobTask.getId(), new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>(){

                    public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {
                        if (counter.incrementAndGet() == numberOfJobs) {
                            this.sendResponseOrFailure(request.getJobId(), (ActionListener<Response>)listener, (AtomicArray<Exception>)failures);
                        }
                    }

                    public void onFailure(Exception e) {
                        int slot = counter.incrementAndGet();
                        failures.set(slot - 1, (Object)e);
                        if (slot == numberOfJobs) {
                            this.sendResponseOrFailure(request.getJobId(), (ActionListener<Response>)listener, (AtomicArray<Exception>)failures);
                        }
                    }

                    private void sendResponseOrFailure(String jobId, ActionListener<Response> listener2, AtomicArray<Exception> failures2) {
                        List catchedExceptions = failures2.asList();
                        if (catchedExceptions.size() == 0) {
                            listener2.onResponse((Object)new Response(true));
                            return;
                        }
                        String msg = "Failed to force close job [" + jobId + "] with [" + catchedExceptions.size() + "] failures, rethrowing last, all Exceptions: [" + catchedExceptions.stream().map(Throwable::getMessage).collect(Collectors.joining(", ")) + "]";
                        ElasticsearchException e = new ElasticsearchException(msg, (Throwable)catchedExceptions.get(0), new Object[0]);
                        listener2.onFailure((Exception)e);
                    }
                });
            }
        }

        private void normalCloseJob(ClusterState currentState, Task task, Request request, List<String> openJobIds, List<String> closingJobIds, ActionListener<Response> listener) {
            PersistentTasksCustomMetaData tasks = (PersistentTasksCustomMetaData)currentState.getMetaData().custom("persistent_tasks");
            WaitForCloseRequest waitForCloseRequest = CloseJobAction.buildWaitForCloseRequest(openJobIds, closingJobIds, tasks, this.auditor);
            if (!waitForCloseRequest.hasJobsToWaitFor()) {
                listener.onResponse((Object)new Response(true));
                return;
            }
            boolean noOpenJobsToClose = openJobIds.isEmpty();
            if (noOpenJobsToClose) {
                this.waitForJobClosed(request, waitForCloseRequest, new Response(true), listener);
                return;
            }
            ActionListener finalListener = ActionListener.wrap(r -> this.waitForJobClosed(request, waitForCloseRequest, (Response)((Object)r), listener), arg_0 -> listener.onFailure(arg_0));
            super.doExecute(task, (BaseTasksRequest)request, finalListener);
        }

        void waitForJobClosed(Request request, final WaitForCloseRequest waitForCloseRequest, final Response response, final ActionListener<Response> listener) {
            this.persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> {
                for (String persistentTaskId : waitForCloseRequest.persistentTaskIds) {
                    if (persistentTasksCustomMetaData.getTask(persistentTaskId) == null) continue;
                    return false;
                }
                return true;
            }, request.getCloseTimeout(), new ActionListener<Boolean>(){

                public void onResponse(Boolean result) {
                    FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(waitForCloseRequest.jobsToFinalize.toArray(new String[0]));
                    ClientHelper.executeAsyncWithOrigin(client, "ml", FinalizeJobExecutionAction.INSTANCE, finalizeRequest, ActionListener.wrap(r -> listener.onResponse((Object)response), arg_0 -> ((ActionListener)listener).onFailure(arg_0)));
                }

                public void onFailure(Exception e) {
                    listener.onFailure(e);
                }
            });
        }

        static class WaitForCloseRequest {
            List<String> persistentTaskIds = new ArrayList<String>();
            List<String> jobsToFinalize = new ArrayList<String>();

            WaitForCloseRequest() {
            }

            public boolean hasJobsToWaitFor() {
                return !this.persistentTaskIds.isEmpty();
            }
        }
    }

    public static class Response
    extends BaseTasksResponse
    implements Writeable,
    ToXContentObject {
        private boolean closed;

        Response() {
            super(null, null);
        }

        Response(StreamInput in) throws IOException {
            super(null, null);
            this.readFrom(in);
        }

        Response(boolean closed) {
            super(null, null);
            this.closed = closed;
        }

        public boolean isClosed() {
            return this.closed;
        }

        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            this.closed = in.readBoolean();
        }

        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeBoolean(this.closed);
        }

        public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
            builder.startObject();
            builder.field("closed", this.closed);
            builder.endObject();
            return builder;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || ((Object)((Object)this)).getClass() != o.getClass()) {
                return false;
            }
            Response response = (Response)((Object)o);
            return this.closed == response.closed;
        }

        public int hashCode() {
            return Objects.hash(this.closed);
        }
    }

    static class RequestBuilder
    extends ActionRequestBuilder<Request, Response, RequestBuilder> {
        RequestBuilder(ElasticsearchClient client, CloseJobAction action) {
            super(client, (Action)action, (ActionRequest)new Request());
        }
    }

    public static class Request
    extends BaseTasksRequest<Request>
    implements ToXContentObject {
        public static final ParseField TIMEOUT = new ParseField("timeout", new String[0]);
        public static final ParseField FORCE = new ParseField("force", new String[0]);
        public static final ParseField ALLOW_NO_JOBS = new ParseField("allow_no_jobs", new String[0]);
        public static ObjectParser<Request, Void> PARSER = new ObjectParser("cluster:admin/xpack/ml/job/close", Request::new);
        private String jobId;
        private boolean force = false;
        private boolean allowNoJobs = true;
        private TimeValue timeout = MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT;
        private String[] openJobIds = new String[0];
        private boolean local;

        public static Request parseRequest(String jobId, XContentParser parser) {
            Request request = (Request)((Object)PARSER.apply(parser, null));
            if (jobId != null) {
                request.setJobId(jobId);
            }
            return request;
        }

        Request() {
        }

        public Request(String jobId) {
            this();
            this.jobId = jobId;
        }

        public String getJobId() {
            return this.jobId;
        }

        public void setJobId(String jobId) {
            this.jobId = jobId;
        }

        public TimeValue getCloseTimeout() {
            return this.timeout;
        }

        public void setCloseTimeout(TimeValue timeout) {
            this.timeout = timeout;
        }

        public boolean isForce() {
            return this.force;
        }

        public void setForce(boolean force) {
            this.force = force;
        }

        public boolean allowNoJobs() {
            return this.allowNoJobs;
        }

        public void setAllowNoJobs(boolean allowNoJobs) {
            this.allowNoJobs = allowNoJobs;
        }

        public void setLocal(boolean local) {
            this.local = local;
        }

        public void setOpenJobIds(String[] openJobIds) {
            this.openJobIds = openJobIds;
        }

        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            this.jobId = in.readString();
            this.timeout = new TimeValue(in);
            this.force = in.readBoolean();
            this.openJobIds = in.readStringArray();
            this.local = in.readBoolean();
            if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
                this.allowNoJobs = in.readBoolean();
            }
        }

        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeString(this.jobId);
            this.timeout.writeTo(out);
            out.writeBoolean(this.force);
            out.writeStringArray(this.openJobIds);
            out.writeBoolean(this.local);
            if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
                out.writeBoolean(this.allowNoJobs);
            }
        }

        public boolean match(Task task) {
            for (String id : this.openJobIds) {
                if (!OpenJobAction.JobTask.match(task, id)) continue;
                return true;
            }
            return false;
        }

        public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
            builder.startObject();
            builder.field(Job.ID.getPreferredName(), this.jobId);
            builder.field(TIMEOUT.getPreferredName(), this.timeout.getStringRep());
            builder.field(FORCE.getPreferredName(), this.force);
            builder.field(ALLOW_NO_JOBS.getPreferredName(), this.allowNoJobs);
            builder.endObject();
            return builder;
        }

        public int hashCode() {
            return Objects.hash(this.jobId, this.timeout, this.force, this.allowNoJobs);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || obj.getClass() != ((Object)((Object)this)).getClass()) {
                return false;
            }
            Request other = (Request)((Object)obj);
            return Objects.equals(this.jobId, other.jobId) && Objects.equals(this.timeout, other.timeout) && Objects.equals(this.force, other.force) && Objects.equals(this.allowNoJobs, other.allowNoJobs);
        }

        static {
            PARSER.declareString(Request::setJobId, Job.ID);
            PARSER.declareString((request, val) -> request.setCloseTimeout(TimeValue.parseTimeValue((String)val, (String)TIMEOUT.getPreferredName())), TIMEOUT);
            PARSER.declareBoolean(Request::setForce, FORCE);
            PARSER.declareBoolean(Request::setAllowNoJobs, ALLOW_NO_JOBS);
        }
    }
}

