/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2.app.commit;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobAbortEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobCommitEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobSetupEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterTaskAbortEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobAbortCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;

public class CommitterEventHandler
extends AbstractService
implements EventHandler<CommitterEvent> {
    private static final Log LOG = LogFactory.getLog(CommitterEventHandler.class);
    private final AppContext context;
    private final OutputCommitter committer;
    private final RMHeartbeatHandler rmHeartbeatHandler;
    private ThreadPoolExecutor launcherPool;
    private Thread eventHandlingThread;
    private BlockingQueue<CommitterEvent> eventQueue = new LinkedBlockingQueue<CommitterEvent>();
    private final AtomicBoolean stopped;
    private final ClassLoader jobClassLoader;
    private Thread jobCommitThread = null;
    private int commitThreadCancelTimeoutMs;
    private long commitWindowMs;
    private FileSystem fs;
    private Path startCommitFile;
    private Path endCommitSuccessFile;
    private Path endCommitFailureFile;

    public CommitterEventHandler(AppContext context, OutputCommitter committer, RMHeartbeatHandler rmHeartbeatHandler) {
        this(context, committer, rmHeartbeatHandler, null);
    }

    public CommitterEventHandler(AppContext context, OutputCommitter committer, RMHeartbeatHandler rmHeartbeatHandler, ClassLoader jobClassLoader) {
        super("CommitterEventHandler");
        this.context = context;
        this.committer = committer;
        this.rmHeartbeatHandler = rmHeartbeatHandler;
        this.stopped = new AtomicBoolean(false);
        this.jobClassLoader = jobClassLoader;
    }

    protected void serviceInit(Configuration conf) throws Exception {
        super.serviceInit(conf);
        this.commitThreadCancelTimeoutMs = conf.getInt("yarn.app.mapreduce.am.job.committer.cancel-timeout", 60000);
        this.commitWindowMs = conf.getLong("yarn.app.mapreduce.am.job.committer.commit-window", 10000L);
        try {
            this.fs = FileSystem.get((Configuration)conf);
            JobID id = TypeConverter.fromYarn(this.context.getApplicationID());
            JobId jobId = TypeConverter.toYarn(id);
            String user = UserGroupInformation.getCurrentUser().getShortUserName();
            this.startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
            this.endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId);
            this.endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId);
        }
        catch (IOException e) {
            throw new YarnRuntimeException(e);
        }
    }

    protected void serviceStart() throws Exception {
        ThreadFactoryBuilder tfBuilder = new ThreadFactoryBuilder().setNameFormat("CommitterEvent Processor #%d");
        if (this.jobClassLoader != null) {
            ThreadFactory backingTf = new ThreadFactory(){

                @Override
                public Thread newThread(Runnable r) {
                    Thread thread2 = new Thread(r);
                    thread2.setContextClassLoader(CommitterEventHandler.this.jobClassLoader);
                    return thread2;
                }
            };
            tfBuilder.setThreadFactory(backingTf);
        }
        ThreadFactory tf = tfBuilder.build();
        this.launcherPool = new HadoopThreadPoolExecutor(5, 5, 1L, TimeUnit.HOURS, new LinkedBlockingQueue(), tf);
        this.eventHandlingThread = new Thread(new Runnable(){

            @Override
            public void run() {
                CommitterEvent event = null;
                while (!CommitterEventHandler.this.stopped.get() && !Thread.currentThread().isInterrupted()) {
                    try {
                        event = (CommitterEvent)CommitterEventHandler.this.eventQueue.take();
                    }
                    catch (InterruptedException e) {
                        if (!CommitterEventHandler.this.stopped.get()) {
                            LOG.error("Returning, interrupted : " + e);
                        }
                        return;
                    }
                    CommitterEventHandler.this.launcherPool.execute(new EventProcessor(event));
                }
            }
        });
        this.eventHandlingThread.setName("CommitterEvent Handler");
        this.eventHandlingThread.start();
        super.serviceStart();
    }

    @Override
    public void handle(CommitterEvent event) {
        try {
            this.eventQueue.put(event);
        }
        catch (InterruptedException e) {
            throw new YarnRuntimeException(e);
        }
    }

    protected void serviceStop() throws Exception {
        if (this.stopped.getAndSet(true)) {
            return;
        }
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
        }
        if (this.launcherPool != null) {
            this.launcherPool.shutdown();
        }
        super.serviceStop();
    }

    private synchronized void jobCommitStarted() throws IOException {
        if (this.jobCommitThread != null) {
            throw new IOException("Commit while another commit thread active: " + this.jobCommitThread.toString());
        }
        this.jobCommitThread = Thread.currentThread();
    }

    private synchronized void jobCommitEnded() {
        if (this.jobCommitThread == Thread.currentThread()) {
            this.jobCommitThread = null;
            this.notifyAll();
        }
    }

    private synchronized void cancelJobCommit() {
        Thread threadCommitting = this.jobCommitThread;
        if (threadCommitting != null && threadCommitting.isAlive()) {
            LOG.info("Cancelling commit");
            threadCommitting.interrupt();
            long now = this.context.getClock().getTime();
            long timeoutTimestamp = now + (long)this.commitThreadCancelTimeoutMs;
            try {
                while (this.jobCommitThread == threadCommitting && now > timeoutTimestamp) {
                    this.wait(now - timeoutTimestamp);
                    now = this.context.getClock().getTime();
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private class EventProcessor
    implements Runnable {
        private CommitterEvent event;

        EventProcessor(CommitterEvent event) {
            this.event = event;
        }

        @Override
        public void run() {
            LOG.info("Processing the event " + this.event.toString());
            switch ((CommitterEventType)((Object)this.event.getType())) {
                case JOB_SETUP: {
                    this.handleJobSetup((CommitterJobSetupEvent)this.event);
                    break;
                }
                case JOB_COMMIT: {
                    this.handleJobCommit((CommitterJobCommitEvent)this.event);
                    break;
                }
                case JOB_ABORT: {
                    this.handleJobAbort((CommitterJobAbortEvent)this.event);
                    break;
                }
                case TASK_ABORT: {
                    this.handleTaskAbort((CommitterTaskAbortEvent)this.event);
                    break;
                }
                default: {
                    throw new YarnRuntimeException("Unexpected committer event " + this.event.toString());
                }
            }
        }

        protected void handleJobSetup(CommitterJobSetupEvent event) {
            try {
                CommitterEventHandler.this.committer.setupJob(event.getJobContext());
                CommitterEventHandler.this.context.getEventHandler().handle(new JobSetupCompletedEvent(event.getJobID()));
            }
            catch (Exception e) {
                LOG.warn("Job setup failed", e);
                CommitterEventHandler.this.context.getEventHandler().handle(new JobSetupFailedEvent(event.getJobID(), StringUtils.stringifyException((Throwable)e)));
            }
        }

        private void touchz(Path p, boolean overwrite) throws IOException {
            CommitterEventHandler.this.fs.create(p, overwrite).close();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void handleJobCommit(CommitterJobCommitEvent event) {
            boolean commitJobIsRepeatable = false;
            try {
                commitJobIsRepeatable = CommitterEventHandler.this.committer.isCommitJobRepeatable(event.getJobContext());
            }
            catch (IOException e) {
                LOG.warn("Exception in committer.isCommitJobRepeatable():", e);
            }
            try {
                this.touchz(CommitterEventHandler.this.startCommitFile, commitJobIsRepeatable);
                CommitterEventHandler.this.jobCommitStarted();
                this.waitForValidCommitWindow();
                CommitterEventHandler.this.committer.commitJob(event.getJobContext());
                this.touchz(CommitterEventHandler.this.endCommitSuccessFile, commitJobIsRepeatable);
                CommitterEventHandler.this.context.getEventHandler().handle(new JobCommitCompletedEvent(event.getJobID()));
            }
            catch (Exception e) {
                LOG.error("Could not commit job", e);
                try {
                    this.touchz(CommitterEventHandler.this.endCommitFailureFile, commitJobIsRepeatable);
                }
                catch (Exception e2) {
                    LOG.error("could not create failure file.", e2);
                }
                CommitterEventHandler.this.context.getEventHandler().handle(new JobCommitFailedEvent(event.getJobID(), StringUtils.stringifyException((Throwable)e)));
            }
            finally {
                CommitterEventHandler.this.jobCommitEnded();
            }
        }

        protected void handleJobAbort(CommitterJobAbortEvent event) {
            CommitterEventHandler.this.cancelJobCommit();
            try {
                CommitterEventHandler.this.committer.abortJob(event.getJobContext(), event.getFinalState());
            }
            catch (Exception e) {
                LOG.warn("Could not abort job", e);
            }
            CommitterEventHandler.this.context.getEventHandler().handle(new JobAbortCompletedEvent(event.getJobID(), event.getFinalState()));
        }

        protected void handleTaskAbort(CommitterTaskAbortEvent event) {
            try {
                CommitterEventHandler.this.committer.abortTask(event.getAttemptContext());
            }
            catch (Exception e) {
                LOG.warn("Task cleanup failed for attempt " + event.getAttemptID(), e);
            }
            CommitterEventHandler.this.context.getEventHandler().handle(new TaskAttemptEvent(event.getAttemptID(), TaskAttemptEventType.TA_CLEANUP_DONE));
        }

        private synchronized void waitForValidCommitWindow() throws InterruptedException {
            long lastHeartbeatTime = CommitterEventHandler.this.rmHeartbeatHandler.getLastHeartbeatTime();
            long now = CommitterEventHandler.this.context.getClock().getTime();
            while (now - lastHeartbeatTime > CommitterEventHandler.this.commitWindowMs) {
                CommitterEventHandler.this.rmHeartbeatHandler.runOnNextHeartbeat(new Runnable(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        EventProcessor eventProcessor = EventProcessor.this;
                        synchronized (eventProcessor) {
                            EventProcessor.this.notify();
                        }
                    }
                });
                this.wait();
                lastHeartbeatTime = CommitterEventHandler.this.rmHeartbeatHandler.getLastHeartbeatTime();
                now = CommitterEventHandler.this.context.getClock().getTime();
            }
        }
    }
}

