/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.launcher;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
import org.apache.tez.dag.app.launcher.ContainerOp;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.task.TezChild;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
import org.apache.tez.serviceplugins.api.ContainerLauncherOperationBase;
import org.apache.tez.serviceplugins.api.ContainerStopRequest;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalContainerLauncher
extends ContainerLauncher {
    private static final Logger LOG = LoggerFactory.getLogger(LocalContainerLauncher.class);
    private final AppContext context;
    private final AtomicBoolean serviceStopped = new AtomicBoolean(false);
    private final String workingDirectory;
    private final TaskCommunicatorManagerInterface tal;
    private final Map<String, String> localEnv;
    private final ExecutionContext executionContext;
    private final int numExecutors;
    private final boolean isLocalMode;
    private final ConcurrentHashMap<ContainerId, RunningTaskCallback> runningContainers = new ConcurrentHashMap();
    private final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CallbackExecutor").build());
    private BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<ContainerOp>();
    private Thread eventHandlingThread;
    private ListeningExecutorService taskExecutorService;

    public LocalContainerLauncher(ContainerLauncherContext containerLauncherContext, AppContext context, TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, String workingDirectory, boolean isLocalMode) throws UnknownHostException {
        super(containerLauncherContext);
        Configuration conf;
        this.context = context;
        this.tal = taskCommunicatorManagerInterface;
        this.workingDirectory = workingDirectory;
        this.isLocalMode = isLocalMode;
        if (isLocalMode) {
            this.localEnv = Maps.newHashMap();
            AuxiliaryServiceHelper.setServiceDataIntoEnv((String)"mapreduce_shuffle", (ByteBuffer)ByteBuffer.allocate(4).putInt(0), this.localEnv);
        } else {
            this.localEnv = System.getenv();
        }
        String host = isLocalMode ? InetAddress.getLocalHost().getHostName() : System.getenv(ApplicationConstants.Environment.NM_HOST.name());
        this.executionContext = new ExecutionContextImpl(host);
        try {
            conf = TezUtils.createConfFromUserPayload((UserPayload)this.getContext().getInitialUserPayload());
        }
        catch (IOException e) {
            throw new TezUncheckedException("Failed to parse user payload for " + LocalContainerLauncher.class.getSimpleName(), (Throwable)e);
        }
        this.numExecutors = conf.getInt("tez.am.inline.task.execution.max-tasks", 1);
        Preconditions.checkState((this.numExecutors >= 1 ? 1 : 0) != 0, (Object)"Must have at least 1 executor");
        ExecutorService rawExecutor = Executors.newFixedThreadPool(this.numExecutors, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LocalTaskExecutionThread #%d").build());
        this.taskExecutorService = MoreExecutors.listeningDecorator((ExecutorService)rawExecutor);
    }

    public void start() throws Exception {
        this.eventHandlingThread = new Thread((Runnable)new TezSubTaskRunner(), "LocalContainerLauncher-SubTaskRunner");
        this.eventHandlingThread.start();
    }

    public void shutdown() throws Exception {
        if (!this.serviceStopped.compareAndSet(false, true)) {
            LOG.info("Service Already stopped. Ignoring additional stop");
            return;
        }
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
            this.eventHandlingThread.join(2000L);
        }
        if (this.taskExecutorService != null) {
            this.taskExecutorService.shutdownNow();
        }
        this.callbackExecutor.shutdownNow();
    }

    void sendContainerLaunchFailedMsg(ContainerId containerId, String message) {
        this.getContext().containerLaunchFailed(containerId, message);
    }

    private void handleLaunchFailed(Throwable t, ContainerId containerId) {
        String message = t instanceof RejectedExecutionException ? "Failed to queue container launch for container Id: " + containerId : "Failed to launch container for container Id: " + containerId;
        LOG.error(message, t);
        this.sendContainerLaunchFailedMsg(containerId, message);
    }

    private void launch(ContainerLaunchRequest event) {
        String tokenIdentifier = this.context.getApplicationID().toString();
        try {
            TezChild tezChild;
            try {
                int taskCommId = this.context.getTaskCommunicatorIdentifier(event.getTaskCommunicatorName());
                tezChild = this.createTezChild(this.context.getAMConf(), event.getContainerId(), tokenIdentifier, this.context.getApplicationAttemptId().getAttemptId(), this.context.getLocalDirs(), ((TezTaskCommunicatorImpl)this.tal.getTaskCommunicator(taskCommId).getTaskCommunicator()).getUmbilical(), TezCommonUtils.parseCredentialsBytes((byte[])event.getContainerLaunchContext().getTokens().array()));
            }
            catch (InterruptedException e) {
                this.handleLaunchFailed(e, event.getContainerId());
                return;
            }
            catch (TezException e) {
                this.handleLaunchFailed(e, event.getContainerId());
                return;
            }
            catch (IOException e) {
                this.handleLaunchFailed(e, event.getContainerId());
                return;
            }
            ListenableFuture runningTaskFuture = this.taskExecutorService.submit(this.createSubTask(tezChild, event.getContainerId()));
            RunningTaskCallback callback = new RunningTaskCallback(event.getContainerId());
            this.runningContainers.put(event.getContainerId(), callback);
            Futures.addCallback((ListenableFuture)runningTaskFuture, (FutureCallback)callback, (Executor)this.callbackExecutor);
        }
        catch (RejectedExecutionException e) {
            this.handleLaunchFailed(e, event.getContainerId());
        }
    }

    private void stop(ContainerStopRequest event) {
        RunningTaskCallback callback = this.runningContainers.get(event.getContainerId());
        if (callback == null) {
            LOG.info("Ignoring stop request for containerId: " + event.getContainerId());
        } else {
            LOG.info("Ignoring stop request for containerId {}. Relying on regular task shutdown for it to end", (Object)event.getContainerId());
        }
        this.getContext().containerStopRequested(event.getContainerId());
    }

    private synchronized Callable<TezChild.ContainerExecutionResult> createSubTask(final TezChild tezChild, final ContainerId containerId) {
        return new Callable<TezChild.ContainerExecutionResult>(){

            @Override
            public TezChild.ContainerExecutionResult call() throws InterruptedException, TezException, IOException {
                Thread.interrupted();
                LocalContainerLauncher.this.getContext().containerLaunched(containerId);
                return tezChild.run();
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TezChild createTezChild(Configuration defaultConf, ContainerId containerId, String tokenIdentifier, int attemptNumber, String[] localDirs, TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol, Credentials credentials) throws InterruptedException, TezException, IOException {
        long memAvailable;
        HashMap<String, String> containerEnv = new HashMap<String, String>();
        containerEnv.putAll(this.localEnv);
        String user = this.isLocalMode ? System.getenv(ApplicationConstants.Environment.USER.name()) : this.context.getUser();
        containerEnv.put(ApplicationConstants.Environment.USER.name(), user);
        LocalContainerLauncher localContainerLauncher = this;
        synchronized (localContainerLauncher) {
            memAvailable = Runtime.getRuntime().maxMemory() / (long)this.numExecutors;
        }
        TezChild tezChild = TezChild.newTezChild((Configuration)defaultConf, null, (int)0, (String)containerId.toString(), (String)tokenIdentifier, (int)attemptNumber, (String[])localDirs, (String)this.workingDirectory, containerEnv, (String)"", (ExecutionContext)this.executionContext, (Credentials)credentials, (long)memAvailable, (String)this.context.getUser(), (TezTaskUmbilicalProtocol)tezTaskUmbilicalProtocol, (boolean)false, (HadoopShim)this.context.getHadoopShim());
        return tezChild;
    }

    public void launchContainer(ContainerLaunchRequest launchRequest) {
        try {
            this.eventQueue.put(new ContainerOp(ContainerOp.OPType.LAUNCH_REQUEST, (ContainerLauncherOperationBase)launchRequest));
        }
        catch (InterruptedException e) {
            throw new TezUncheckedException((Throwable)e);
        }
    }

    public void stopContainer(ContainerStopRequest stopRequest) {
        try {
            this.eventQueue.put(new ContainerOp(ContainerOp.OPType.STOP_REQUEST, (ContainerLauncherOperationBase)stopRequest));
        }
        catch (InterruptedException e) {
            throw new TezUncheckedException((Throwable)e);
        }
    }

    private class RunningTaskCallback
    implements FutureCallback<TezChild.ContainerExecutionResult> {
        private final ContainerId containerId;

        RunningTaskCallback(ContainerId containerId) {
            this.containerId = containerId;
        }

        public void onSuccess(TezChild.ContainerExecutionResult result) {
            LocalContainerLauncher.this.runningContainers.remove(this.containerId);
            LOG.info("ContainerExecutionResult for: " + this.containerId + " = " + result);
            if (result.getExitStatus() == TezChild.ContainerExecutionResult.ExitStatus.SUCCESS || result.getExitStatus() == TezChild.ContainerExecutionResult.ExitStatus.ASKED_TO_DIE) {
                LOG.info("Container: " + this.containerId + " completed successfully");
                LocalContainerLauncher.this.getContext().containerCompleted(this.containerId, result.getExitStatus().getExitCode(), null, TaskAttemptEndReason.CONTAINER_EXITED);
            } else {
                LOG.info("Container: " + this.containerId + " completed but with errors");
                LocalContainerLauncher.this.getContext().containerCompleted(this.containerId, result.getExitStatus().getExitCode(), (String)(result.getErrorMessage() == null ? (result.getThrowable() == null ? null : result.getThrowable().getMessage()) : result.getErrorMessage()), TaskAttemptEndReason.APPLICATION_ERROR);
            }
        }

        public void onFailure(Throwable t) {
            LocalContainerLauncher.this.runningContainers.remove(this.containerId);
            if (!(t instanceof CancellationException)) {
                LOG.info("Container: " + this.containerId + ": Execution Failed: ", t);
                LocalContainerLauncher.this.getContext().containerCompleted(this.containerId, TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE.getExitCode(), t.getMessage(), TaskAttemptEndReason.APPLICATION_ERROR);
            } else {
                LOG.info("Ignoring CancellationException - triggered by LocalContainerLauncher");
                LocalContainerLauncher.this.getContext().containerCompleted(this.containerId, TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(), "CancellationException", TaskAttemptEndReason.CONTAINER_EXITED);
            }
        }
    }

    private class TezSubTaskRunner
    implements Runnable {
        private TezSubTaskRunner() {
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted() && !LocalContainerLauncher.this.serviceStopped.get()) {
                try {
                    ContainerOp event = (ContainerOp)LocalContainerLauncher.this.eventQueue.take();
                    switch (event.getOpType()) {
                        case LAUNCH_REQUEST: {
                            LocalContainerLauncher.this.launch(event.getLaunchRequest());
                            break;
                        }
                        case STOP_REQUEST: {
                            LocalContainerLauncher.this.stop(event.getStopRequest());
                        }
                    }
                }
                catch (InterruptedException e) {
                    if (!LocalContainerLauncher.this.serviceStopped.get()) {
                        LOG.error("TezSubTaskRunner interrupted ", (Throwable)e);
                    }
                    return;
                }
                catch (Throwable e) {
                    LOG.error("TezSubTaskRunner failed due to exception", e);
                    throw e;
                }
            }
        }
    }
}

