/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.action;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksService;

public class StopDatafeedAction
extends Action<Request, Response, RequestBuilder> {
    public static final StopDatafeedAction INSTANCE = new StopDatafeedAction();
    public static final String NAME = "cluster:admin/xpack/ml/datafeed/stop";
    public static final ParseField TIMEOUT = new ParseField("timeout", new String[0]);
    public static final ParseField FORCE = new ParseField("force", new String[0]);
    public static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueMinutes((long)5L);

    private StopDatafeedAction() {
        super(NAME);
    }

    public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
        return new RequestBuilder(client, this);
    }

    public Response newResponse() {
        return new Response();
    }

    static void resolveDataFeedIds(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks, List<String> startedDatafeedIds, List<String> stoppingDatafeedIds) {
        if (!"_all".equals(datafeedId)) {
            StopDatafeedAction.validateDatafeedTask(datafeedId, mlMetadata);
            StopDatafeedAction.addDatafeedTaskIdAccordingToState(datafeedId, MlMetadata.getDatafeedState(datafeedId, tasks), startedDatafeedIds, stoppingDatafeedIds);
            return;
        }
        if (mlMetadata.getDatafeeds().isEmpty()) {
            return;
        }
        for (String expandedDatafeedId : mlMetadata.getDatafeeds().keySet()) {
            StopDatafeedAction.validateDatafeedTask(expandedDatafeedId, mlMetadata);
            StopDatafeedAction.addDatafeedTaskIdAccordingToState(expandedDatafeedId, MlMetadata.getDatafeedState(expandedDatafeedId, tasks), startedDatafeedIds, stoppingDatafeedIds);
        }
    }

    private static void addDatafeedTaskIdAccordingToState(String datafeedId, DatafeedState datafeedState, List<String> startedDatafeedIds, List<String> stoppingDatafeedIds) {
        switch (datafeedState) {
            case STARTED: {
                startedDatafeedIds.add(datafeedId);
                break;
            }
            case STOPPED: {
                break;
            }
            case STOPPING: {
                stoppingDatafeedIds.add(datafeedId);
                break;
            }
        }
    }

    static void validateDatafeedTask(String datafeedId, MlMetadata mlMetadata) {
        DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
        if (datafeed == null) {
            throw new ResourceNotFoundException(Messages.getMessage("No datafeed with id [{0}] exists", datafeedId), new Object[0]);
        }
    }

    public static class TransportAction
    extends TransportTasksAction<StartDatafeedAction.DatafeedTask, Request, Response, Response> {
        private final PersistentTasksService persistentTasksService;

        @Inject
        public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, PersistentTasksService persistentTasksService) {
            super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, Request::new, Response::new, "ml_utility");
            this.persistentTasksService = persistentTasksService;
        }

        protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
            ClusterState state = this.clusterService.state();
            DiscoveryNodes nodes = state.nodes();
            if (!nodes.isLocalNodeElectedMaster()) {
                if (nodes.getMasterNode() == null) {
                    listener.onFailure((Exception)new MasterNotDiscoveredException("no known master node"));
                } else {
                    this.transportService.sendRequest(nodes.getMasterNode(), this.actionName, (TransportRequest)request, (TransportResponseHandler)new ActionListenerResponseHandler(listener, Response::new));
                }
            } else {
                MlMetadata mlMetadata = (MlMetadata)state.getMetaData().custom("ml");
                PersistentTasksCustomMetaData tasks = (PersistentTasksCustomMetaData)state.getMetaData().custom("persistent_tasks");
                ArrayList<String> startedDatafeeds = new ArrayList<String>();
                ArrayList<String> stoppingDatafeeds = new ArrayList<String>();
                StopDatafeedAction.resolveDataFeedIds(request.getDatafeedId(), mlMetadata, tasks, startedDatafeeds, stoppingDatafeeds);
                if (startedDatafeeds.isEmpty() && stoppingDatafeeds.isEmpty()) {
                    listener.onResponse((Object)new Response(true));
                    return;
                }
                request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()]));
                if (request.force) {
                    this.forceStopDatafeed(request, listener, tasks, startedDatafeeds);
                } else {
                    this.normalStopDatafeed(task, request, listener, tasks, startedDatafeeds, stoppingDatafeeds);
                }
            }
        }

        private void normalStopDatafeed(Task task, Request request, ActionListener<Response> listener, PersistentTasksCustomMetaData tasks, List<String> startedDatafeeds, List<String> stoppingDatafeeds) {
            HashSet<String> executorNodes = new HashSet<String>();
            for (String datafeedId : startedDatafeeds) {
                PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks);
                if (datafeedTask == null || !datafeedTask.isAssigned()) {
                    String message = "Cannot stop datafeed [" + datafeedId + "] because the datafeed does not have an assigned node. Use force stop to stop the datafeed";
                    listener.onFailure((Exception)org.elasticsearch.xpack.ml.utils.ExceptionsHelper.conflictStatusException(message, new Object[0]));
                    return;
                }
                executorNodes.add(datafeedTask.getExecutorNode());
            }
            request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
            List allDataFeedsToWaitFor = Stream.concat(startedDatafeeds.stream().map(id -> MlMetadata.datafeedTaskId(id)), stoppingDatafeeds.stream().map(id -> MlMetadata.datafeedTaskId(id))).collect(Collectors.toList());
            ActionListener finalListener = ActionListener.wrap(r -> this.waitForDatafeedStopped(allDataFeedsToWaitFor, request, (Response)((Object)r), listener), arg_0 -> listener.onFailure(arg_0));
            super.doExecute(task, (BaseTasksRequest)request, finalListener);
        }

        private void forceStopDatafeed(final Request request, final ActionListener<Response> listener, PersistentTasksCustomMetaData tasks, final List<String> startedDatafeeds) {
            final AtomicInteger counter = new AtomicInteger();
            final AtomicArray failures = new AtomicArray(startedDatafeeds.size());
            for (String datafeedId : startedDatafeeds) {
                PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks);
                if (datafeedTask != null) {
                    this.persistentTasksService.cancelPersistentTask(datafeedTask.getId(), new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>(){

                        public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
                            if (counter.incrementAndGet() == startedDatafeeds.size()) {
                                this.sendResponseOrFailure(request.getDatafeedId(), (ActionListener<Response>)listener, (AtomicArray<Exception>)failures);
                            }
                        }

                        public void onFailure(Exception e) {
                            int slot = counter.incrementAndGet();
                            failures.set(slot - 1, (Object)e);
                            if (slot == startedDatafeeds.size()) {
                                this.sendResponseOrFailure(request.getDatafeedId(), (ActionListener<Response>)listener, (AtomicArray<Exception>)failures);
                            }
                        }
                    });
                    continue;
                }
                String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but datafeed's task could not be found.";
                this.logger.warn(msg);
                int slot = counter.incrementAndGet();
                failures.set(slot - 1, (Object)new RuntimeException(msg));
                if (slot != startedDatafeeds.size()) continue;
                this.sendResponseOrFailure(request.getDatafeedId(), listener, (AtomicArray<Exception>)failures);
            }
        }

        protected void taskOperation(final Request request, final StartDatafeedAction.DatafeedTask datafeedTaskTask, final ActionListener<Response> listener) {
            DatafeedState taskStatus = DatafeedState.STOPPING;
            datafeedTaskTask.updatePersistentStatus(taskStatus, ActionListener.wrap(task -> this.threadPool.executor("ml_utility").execute((Runnable)new AbstractRunnable(){

                public void onFailure(Exception e) {
                    listener.onFailure(e);
                }

                protected void doRun() throws Exception {
                    datafeedTaskTask.stop("stop_datafeed (api)", request.getStopTimeout());
                    listener.onResponse((Object)new Response(true));
                }
            }), e -> {
                if (e instanceof ResourceNotFoundException) {
                    listener.onResponse((Object)new Response(true));
                } else {
                    listener.onFailure(e);
                }
            }));
        }

        private void sendResponseOrFailure(String datafeedId, ActionListener<Response> listener, AtomicArray<Exception> failures) {
            List catchedExceptions = failures.asList();
            if (catchedExceptions.size() == 0) {
                listener.onResponse((Object)new Response(true));
                return;
            }
            String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + catchedExceptions.size() + "] failures, rethrowing last, all Exceptions: [" + catchedExceptions.stream().map(Throwable::getMessage).collect(Collectors.joining(", ")) + "]";
            ElasticsearchException e = new ElasticsearchException(msg, (Throwable)catchedExceptions.get(0), new Object[0]);
            listener.onFailure((Exception)((Object)e));
        }

        void waitForDatafeedStopped(List<String> datafeedPersistentTaskIds, Request request, final Response response, final ActionListener<Response> listener) {
            this.persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> {
                for (String persistentTaskId : datafeedPersistentTaskIds) {
                    if (persistentTasksCustomMetaData.getTask(persistentTaskId) == null) continue;
                    return false;
                }
                return true;
            }, request.getTimeout(), new ActionListener<Boolean>(){

                public void onResponse(Boolean result) {
                    listener.onResponse((Object)response);
                }

                public void onFailure(Exception e) {
                    listener.onFailure(e);
                }
            });
        }

        protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures, List<FailedNodeException> failedNodeExceptions) {
            if (request.getResolvedStartedDatafeedIds().length != tasks.size()) {
                if (!taskOperationFailures.isEmpty()) {
                    throw ExceptionsHelper.convertToElastic((Exception)taskOperationFailures.get(0).getCause());
                }
                if (!failedNodeExceptions.isEmpty()) {
                    throw ExceptionsHelper.convertToElastic((Exception)((Exception)failedNodeExceptions.get(0)));
                }
                return new Response(true);
            }
            return new Response(tasks.stream().allMatch(Response::isStopped));
        }

        protected Response readTaskResponse(StreamInput in) throws IOException {
            return new Response(in);
        }
    }

    static class RequestBuilder
    extends ActionRequestBuilder<Request, Response, RequestBuilder> {
        RequestBuilder(ElasticsearchClient client, StopDatafeedAction action) {
            super(client, (Action)action, (ActionRequest)new Request());
        }
    }

    public static class Response
    extends BaseTasksResponse
    implements Writeable {
        private boolean stopped;

        public Response(boolean stopped) {
            super(null, null);
            this.stopped = stopped;
        }

        public Response(StreamInput in) throws IOException {
            super(null, null);
            this.readFrom(in);
        }

        public Response() {
            super(null, null);
        }

        public boolean isStopped() {
            return this.stopped;
        }

        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            this.stopped = in.readBoolean();
        }

        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeBoolean(this.stopped);
        }
    }

    public static class Request
    extends BaseTasksRequest<Request>
    implements ToXContent {
        public static ObjectParser<Request, Void> PARSER = new ObjectParser("cluster:admin/xpack/ml/datafeed/stop", Request::new);
        private String datafeedId;
        private String[] resolvedStartedDatafeedIds;
        private TimeValue stopTimeout = DEFAULT_TIMEOUT;
        private boolean force = false;

        public static Request fromXContent(XContentParser parser) {
            return Request.parseRequest(null, parser);
        }

        public static Request parseRequest(String datafeedId, XContentParser parser) {
            Request request = (Request)((Object)PARSER.apply(parser, null));
            if (datafeedId != null) {
                request.datafeedId = datafeedId;
            }
            return request;
        }

        public Request(String datafeedId) {
            this.datafeedId = org.elasticsearch.xpack.ml.utils.ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
            this.resolvedStartedDatafeedIds = new String[]{datafeedId};
        }

        Request() {
        }

        private String getDatafeedId() {
            return this.datafeedId;
        }

        private String[] getResolvedStartedDatafeedIds() {
            return this.resolvedStartedDatafeedIds;
        }

        private void setResolvedStartedDatafeedIds(String[] resolvedStartedDatafeedIds) {
            this.resolvedStartedDatafeedIds = resolvedStartedDatafeedIds;
        }

        public TimeValue getStopTimeout() {
            return this.stopTimeout;
        }

        public void setStopTimeout(TimeValue stopTimeout) {
            this.stopTimeout = org.elasticsearch.xpack.ml.utils.ExceptionsHelper.requireNonNull(stopTimeout, TIMEOUT.getPreferredName());
        }

        public boolean isForce() {
            return this.force;
        }

        public void setForce(boolean force) {
            this.force = force;
        }

        public boolean match(Task task) {
            for (String id : this.resolvedStartedDatafeedIds) {
                String expectedDescription = MlMetadata.datafeedTaskId(id);
                if (!(task instanceof StartDatafeedAction.DatafeedTask) || !expectedDescription.equals(task.getDescription())) continue;
                return true;
            }
            return false;
        }

        public ActionRequestValidationException validate() {
            return null;
        }

        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            this.datafeedId = in.readString();
            this.resolvedStartedDatafeedIds = in.readStringArray();
            this.stopTimeout = new TimeValue(in);
            this.force = in.readBoolean();
        }

        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeString(this.datafeedId);
            out.writeStringArray(this.resolvedStartedDatafeedIds);
            this.stopTimeout.writeTo(out);
            out.writeBoolean(this.force);
        }

        public int hashCode() {
            return Objects.hash(this.datafeedId, this.stopTimeout, this.force);
        }

        public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
            builder.startObject();
            builder.field(DatafeedConfig.ID.getPreferredName(), this.datafeedId);
            builder.field(TIMEOUT.getPreferredName(), this.stopTimeout.getStringRep());
            builder.field(FORCE.getPreferredName(), this.force);
            builder.endObject();
            return builder;
        }

        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (((Object)((Object)this)).getClass() != obj.getClass()) {
                return false;
            }
            Request other = (Request)((Object)obj);
            return Objects.equals(this.datafeedId, other.datafeedId) && Objects.equals(this.stopTimeout, other.stopTimeout) && Objects.equals(this.force, other.force);
        }

        static {
            PARSER.declareString((request, datafeedId) -> {
                request.datafeedId = datafeedId;
            }, DatafeedConfig.ID);
            PARSER.declareString((request, val) -> request.setStopTimeout(TimeValue.parseTimeValue((String)val, (String)TIMEOUT.getPreferredName())), TIMEOUT);
            PARSER.declareBoolean(Request::setForce, FORCE);
        }
    }
}

