/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.client.api.async.impl;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class AMRMClientAsyncImpl<T extends AMRMClient.ContainerRequest>
extends AMRMClientAsync<T> {
    private static final Logger LOG = LoggerFactory.getLogger(AMRMClientAsyncImpl.class);
    private final HeartbeatThread heartbeatThread;
    private final CallbackHandlerThread handlerThread;
    private final BlockingQueue<Object> responseQueue;
    private final Object unregisterHeartbeatLock = new Object();
    private volatile boolean keepRunning = true;
    private volatile float progress;

    public AMRMClientAsyncImpl(int intervalMs, AMRMClientAsync.AbstractCallbackHandler callbackHandler) {
        this(new AMRMClientImpl(), intervalMs, callbackHandler);
    }

    public AMRMClientAsyncImpl(AMRMClient<T> client, int intervalMs, AMRMClientAsync.AbstractCallbackHandler callbackHandler) {
        super(client, intervalMs, callbackHandler);
        this.heartbeatThread = new HeartbeatThread();
        this.handlerThread = new CallbackHandlerThread();
        this.responseQueue = new LinkedBlockingQueue<Object>();
    }

    @Deprecated
    public AMRMClientAsyncImpl(int intervalMs, AMRMClientAsync.CallbackHandler callbackHandler) {
        this(new AMRMClientImpl(), intervalMs, callbackHandler);
    }

    @InterfaceAudience.Private
    @Deprecated
    @VisibleForTesting
    public AMRMClientAsyncImpl(AMRMClient<T> client, int intervalMs, AMRMClientAsync.CallbackHandler callbackHandler) {
        super(client, intervalMs, callbackHandler);
        this.heartbeatThread = new HeartbeatThread();
        this.handlerThread = new CallbackHandlerThread();
        this.responseQueue = new LinkedBlockingQueue<Object>();
    }

    @Override
    protected void serviceInit(Configuration conf) throws Exception {
        super.serviceInit(conf);
        this.client.init(conf);
    }

    @Override
    protected void serviceStart() throws Exception {
        this.handlerThread.setDaemon(true);
        this.handlerThread.start();
        this.client.start();
        super.serviceStart();
    }

    @Override
    protected void serviceStop() throws Exception {
        this.keepRunning = false;
        this.heartbeatThread.interrupt();
        try {
            this.heartbeatThread.join();
        }
        catch (InterruptedException ex) {
            LOG.error("Error joining with heartbeat thread", ex);
        }
        this.client.stop();
        this.handlerThread.interrupt();
        super.serviceStop();
    }

    @Override
    public List<? extends Collection<T>> getMatchingRequests(Priority priority, String resourceName, Resource capability) {
        return this.client.getMatchingRequests(priority, resourceName, capability);
    }

    @Override
    public void addSchedulingRequests(Collection<SchedulingRequest> schedulingRequests) {
        this.client.addSchedulingRequests(schedulingRequests);
    }

    @Override
    public RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException {
        return this.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl, null);
    }

    @Override
    public RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl, Map<Set<String>, PlacementConstraint> placementConstraintsMap) throws YarnException, IOException {
        RegisterApplicationMasterResponse response = this.client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl, placementConstraintsMap);
        this.heartbeatThread.start();
        return response;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) throws YarnException, IOException {
        Object object = this.unregisterHeartbeatLock;
        synchronized (object) {
            this.keepRunning = false;
            this.client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
        }
    }

    @Override
    public void addContainerRequest(T req) {
        this.client.addContainerRequest(req);
    }

    @Override
    public void removeContainerRequest(T req) {
        this.client.removeContainerRequest(req);
    }

    @Override
    public void requestContainerUpdate(Container container, UpdateContainerRequest updateContainerRequest) {
        this.client.requestContainerUpdate(container, updateContainerRequest);
    }

    @Override
    public void releaseAssignedContainer(ContainerId containerId) {
        this.client.releaseAssignedContainer(containerId);
    }

    @Override
    public Resource getAvailableResources() {
        return this.client.getAvailableResources();
    }

    @Override
    public int getClusterNodeCount() {
        return this.client.getClusterNodeCount();
    }

    @Override
    public void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
        this.client.updateBlacklist(blacklistAdditions, blacklistRemovals);
    }

    @Override
    public void updateTrackingUrl(String trackingUrl) {
        this.client.updateTrackingUrl(trackingUrl);
    }

    private class CallbackHandlerThread
    extends Thread {
        public CallbackHandlerThread() {
            super("AMRM Callback Handler Thread");
        }

        @Override
        public void run() {
            while (AMRMClientAsyncImpl.this.keepRunning) {
                try {
                    List<RejectedSchedulingRequest> rejectedSchedulingRequests;
                    PreemptionMessage preemptionMessage;
                    List<Container> allocated;
                    List<ContainerStatus> completed;
                    List<NodeReport> updatedNodes;
                    TimelineV2Client timelineClient;
                    Object object;
                    try {
                        object = AMRMClientAsyncImpl.this.responseQueue.take();
                    }
                    catch (InterruptedException ex) {
                        LOG.debug("Interrupted while waiting for queue", ex);
                        Thread.currentThread().interrupt();
                        continue;
                    }
                    if (object instanceof Throwable) {
                        AMRMClientAsyncImpl.this.progress = AMRMClientAsyncImpl.this.handler.getProgress();
                        AMRMClientAsyncImpl.this.handler.onError((Throwable)object);
                        continue;
                    }
                    AllocateResponse response = (AllocateResponse)object;
                    String collectorAddress = null;
                    if (response.getCollectorInfo() != null) {
                        collectorAddress = response.getCollectorInfo().getCollectorAddr();
                    }
                    if ((timelineClient = AMRMClientAsyncImpl.this.client.getRegisteredTimelineV2Client()) != null && response.getCollectorInfo() != null) {
                        timelineClient.setTimelineCollectorInfo(response.getCollectorInfo());
                    }
                    if (!(updatedNodes = response.getUpdatedNodes()).isEmpty()) {
                        AMRMClientAsyncImpl.this.handler.onNodesUpdated(updatedNodes);
                    }
                    if (!(completed = response.getCompletedContainersStatuses()).isEmpty()) {
                        AMRMClientAsyncImpl.this.handler.onContainersCompleted(completed);
                    }
                    if (AMRMClientAsyncImpl.this.handler instanceof AMRMClientAsync.AbstractCallbackHandler) {
                        ArrayList<UpdatedContainer> changed = new ArrayList<UpdatedContainer>();
                        changed.addAll(response.getUpdatedContainers());
                        if (!changed.isEmpty()) {
                            ((AMRMClientAsync.AbstractCallbackHandler)AMRMClientAsyncImpl.this.handler).onContainersUpdated(changed);
                        }
                    }
                    if (!(allocated = response.getAllocatedContainers()).isEmpty()) {
                        AMRMClientAsyncImpl.this.handler.onContainersAllocated(allocated);
                    }
                    if ((preemptionMessage = response.getPreemptionMessage()) != null && AMRMClientAsyncImpl.this.handler instanceof AMRMClientAsync.AbstractCallbackHandler) {
                        ((AMRMClientAsync.AbstractCallbackHandler)AMRMClientAsyncImpl.this.handler).onPreemptionMessageReceived(preemptionMessage);
                    }
                    if (!response.getContainersFromPreviousAttempts().isEmpty() && AMRMClientAsyncImpl.this.handler instanceof AMRMClientAsync.AbstractCallbackHandler) {
                        ((AMRMClientAsync.AbstractCallbackHandler)AMRMClientAsyncImpl.this.handler).onContainersReceivedFromPreviousAttempts(response.getContainersFromPreviousAttempts());
                    }
                    if (!(rejectedSchedulingRequests = response.getRejectedSchedulingRequests()).isEmpty() && AMRMClientAsyncImpl.this.handler instanceof AMRMClientAsync.AbstractCallbackHandler) {
                        ((AMRMClientAsync.AbstractCallbackHandler)AMRMClientAsyncImpl.this.handler).onRequestsRejected(rejectedSchedulingRequests);
                    }
                    AMRMClientAsyncImpl.this.progress = AMRMClientAsyncImpl.this.handler.getProgress();
                }
                catch (Throwable ex) {
                    AMRMClientAsyncImpl.this.handler.onError(ex);
                    throw new YarnRuntimeException(ex);
                }
            }
            return;
        }
    }

    private class HeartbeatThread
    extends Thread {
        public HeartbeatThread() {
            super("AMRM Heartbeater thread");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (true) {
                Object response = null;
                Object object = AMRMClientAsyncImpl.this.unregisterHeartbeatLock;
                synchronized (object) {
                    if (!AMRMClientAsyncImpl.this.keepRunning) {
                        return;
                    }
                    try {
                        response = AMRMClientAsyncImpl.this.client.allocate(AMRMClientAsyncImpl.this.progress);
                    }
                    catch (ApplicationAttemptNotFoundException e) {
                        AMRMClientAsyncImpl.this.handler.onShutdownRequest();
                        LOG.info("Shutdown requested. Stopping callback.");
                        return;
                    }
                    catch (Throwable ex) {
                        LOG.error("Exception on heartbeat", ex);
                        response = ex;
                    }
                    if (response != null) {
                        while (true) {
                            try {
                                AMRMClientAsyncImpl.this.responseQueue.put(response);
                            }
                            catch (InterruptedException ex) {
                                LOG.debug("Interrupted while waiting to put on response queue", ex);
                                continue;
                            }
                            break;
                        }
                    }
                }
                try {
                    Thread.sleep(AMRMClientAsyncImpl.this.heartbeatIntervalMs.get());
                    continue;
                }
                catch (InterruptedException ex) {
                    LOG.debug("Heartbeater interrupted", ex);
                    continue;
                }
                break;
            }
        }
    }
}

