/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.hadoop.yarn.am;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.elasticsearch.hadoop.yarn.am.AppMasterRpc;
import org.elasticsearch.hadoop.yarn.am.EsYarnNmException;
import org.elasticsearch.hadoop.yarn.am.NodeMasterRpc;
import org.elasticsearch.hadoop.yarn.cfg.Config;
import org.elasticsearch.hadoop.yarn.compat.YarnCompat;
import org.elasticsearch.hadoop.yarn.util.StringUtils;
import org.elasticsearch.hadoop.yarn.util.YarnUtils;

class EsCluster
implements AutoCloseable {
    private static final Log log = LogFactory.getLog(EsCluster.class);
    private final AppMasterRpc amRpc;
    private final NodeMasterRpc nmRpc;
    private final Configuration cfg;
    private final Config appConfig;
    private final Map<String, String> masterEnv;
    private volatile boolean running = false;
    private volatile boolean clusterHasFailed = false;
    private final Set<ContainerId> allocatedContainers = new LinkedHashSet<ContainerId>();
    private final Set<ContainerId> completedContainers = new LinkedHashSet<ContainerId>();

    public EsCluster(AppMasterRpc rpc, Config appConfig, Map<String, String> masterEnv) {
        this.amRpc = rpc;
        this.cfg = rpc.getConfiguration();
        this.nmRpc = new NodeMasterRpc(this.cfg, rpc.getNMToCache());
        this.appConfig = appConfig;
        this.masterEnv = masterEnv;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        this.running = true;
        this.nmRpc.start();
        UserGroupInformation.setConfiguration((Configuration)this.cfg);
        log.info((Object)String.format("Allocating Elasticsearch cluster with %d nodes", this.appConfig.containersToAllocate()));
        Resource capability = YarnCompat.resource(this.cfg, this.appConfig.containerMem(), this.appConfig.containerVCores());
        Priority prio = Priority.newInstance((int)this.appConfig.amPriority());
        for (int i = 0; i < this.appConfig.containersToAllocate(); ++i) {
            AMRMClient.ContainerRequest req = new AMRMClient.ContainerRequest(capability, null, null, prio);
            this.amRpc.addContainerRequest(req);
        }
        long heartBeatRate = TimeUnit.SECONDS.toMillis(5L);
        int responseId = 0;
        try {
            do {
                AllocateResponse alloc = this.amRpc.allocate(responseId++);
                List currentlyAllocated = alloc.getAllocatedContainers();
                for (Container container : currentlyAllocated) {
                    this.launchContainer(container);
                    this.allocatedContainers.add(container.getId());
                }
                if (currentlyAllocated.size() > 0) {
                    int needed = this.appConfig.containersToAllocate() - this.allocatedContainers.size();
                    if (needed > 0) {
                        log.info((Object)String.format("%s containers allocated, %s remaining", this.allocatedContainers.size(), needed));
                    } else {
                        log.info((Object)String.format("Fully allocated %s containers", this.allocatedContainers.size()));
                    }
                }
                List completed = alloc.getCompletedContainersStatuses();
                for (ContainerStatus status : completed) {
                    if (this.completedContainers.contains(status.getContainerId())) continue;
                    ContainerId containerId = status.getContainerId();
                    this.completedContainers.add(containerId);
                    boolean containerSuccesful = false;
                    switch (status.getExitStatus()) {
                        case 0: {
                            log.info((Object)String.format("Container %s finished succesfully...", containerId));
                            containerSuccesful = true;
                            break;
                        }
                        case -100: {
                            log.warn((Object)String.format("Container %s aborted...", containerId));
                            break;
                        }
                        case -101: {
                            log.warn((Object)String.format("Container %s ran out of disk...", containerId));
                            break;
                        }
                        case -102: {
                            log.warn((Object)String.format("Container %s preempted...", containerId));
                            break;
                        }
                        default: {
                            log.warn((Object)String.format("Container %s exited with an invalid/unknown exit code...", containerId));
                        }
                    }
                    if (containerSuccesful) continue;
                    log.warn((Object)"Cluster has not completed succesfully...");
                    this.clusterHasFailed = true;
                    this.running = false;
                }
                if (this.completedContainers.size() == this.appConfig.containersToAllocate()) {
                    this.running = false;
                }
                if (!this.running) continue;
                try {
                    Thread.sleep(heartBeatRate);
                }
                catch (Exception ex) {
                    throw new EsYarnNmException("Cluster interrupted");
                }
            } while (this.running);
        }
        finally {
            log.info((Object)"Cluster has completed running...");
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(15L));
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            this.close();
        }
    }

    private void launchContainer(Container container) {
        ContainerLaunchContext ctx = (ContainerLaunchContext)Records.newRecord(ContainerLaunchContext.class);
        ctx.setEnvironment(this.setupEnv(this.appConfig));
        ctx.setLocalResources(this.setupEsZipResource(this.appConfig));
        ctx.setCommands(this.setupEsScript(this.appConfig));
        log.info((Object)("About to launch container for command: " + ctx.getCommands()));
        Map<String, ByteBuffer> startContainer = this.nmRpc.startContainer(container, ctx);
        log.info((Object)("Started container " + container));
    }

    private Map<String, String> setupEnv(Config appConfig) {
        Map<String, String> env = YarnUtils.setupEnv(this.cfg);
        YarnUtils.addToEnv(env, appConfig.envVars());
        return env;
    }

    private Map<String, LocalResource> setupEsZipResource(Config conf) {
        FileStatus fsStat;
        LinkedHashMap<String, LocalResource> resources = new LinkedHashMap<String, LocalResource>();
        LocalResource esZip = (LocalResource)Records.newRecord(LocalResource.class);
        String esZipHdfsPath = conf.esZipHdfsPath();
        Path p = new Path(esZipHdfsPath);
        try {
            fsStat = FileSystem.get((Configuration)this.cfg).getFileStatus(p);
        }
        catch (IOException ex) {
            throw new IllegalArgumentException(String.format("Cannot find Elasticsearch zip at [%s]; make sure the artifacts have been properly provisioned and the correct permissions are in place.", esZipHdfsPath), ex);
        }
        esZip.setResource(ConverterUtils.getYarnUrlFromPath((Path)fsStat.getPath()));
        esZip.setSize(fsStat.getLen());
        esZip.setTimestamp(fsStat.getModificationTime());
        esZip.setType(LocalResourceType.ARCHIVE);
        esZip.setVisibility(LocalResourceVisibility.PUBLIC);
        resources.put(conf.esZipName(), esZip);
        return resources;
    }

    private List<String> setupEsScript(Config conf) {
        ArrayList<String> cmds = new ArrayList<String>();
        cmds.add(YarnCompat.$$(ApplicationConstants.Environment.SHELL));
        cmds.add(conf.esZipName() + "/" + conf.esScript());
        cmds.add("1><LOG_DIR>/stdout");
        cmds.add("2><LOG_DIR>/stderr");
        return Collections.singletonList(StringUtils.concatenate(cmds, " "));
    }

    public boolean hasFailed() {
        return this.clusterHasFailed;
    }

    @Override
    public void close() {
        this.running = false;
        this.nmRpc.close();
    }
}

