/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.app.dag.impl.BroadcastEdgeManager;
import org.apache.tez.dag.app.dag.impl.OneToOneEdgeManager;
import org.apache.tez.dag.app.dag.impl.OneToOneEdgeManagerOnDemand;
import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Edge {
    private static final Logger LOG = LoggerFactory.getLogger(Edge.class);
    private EdgeProperty edgeProperty;
    private EdgeManagerPluginContext edgeManagerContext;
    @VisibleForTesting
    EdgeManagerPlugin edgeManager;
    private boolean onDemandRouting = false;
    private final EventHandler eventHandler;
    private final Configuration conf;
    private AtomicBoolean bufferEvents = new AtomicBoolean(false);
    private List<TezEvent> destinationEventBuffer = new ArrayList<TezEvent>();
    private List<TezEvent> sourceEventBuffer = new ArrayList<TezEvent>();
    private Vertex sourceVertex;
    private Vertex destinationVertex;
    private EventMetaData destinationMetaInfo;
    private boolean routingNeeded = true;
    private final ConcurrentMap<TezTaskAttemptID, PendingEventRouteMetadata> pendingEvents = Maps.newConcurrentMap();

    public Edge(EdgeProperty edgeProperty, EventHandler eventHandler, Configuration conf) throws TezException {
        this.edgeProperty = edgeProperty;
        this.eventHandler = eventHandler;
        this.conf = conf;
        this.createEdgeManager();
    }

    private void createEdgeManager() throws TezException {
        switch (this.edgeProperty.getDataMovementType()) {
            case ONE_TO_ONE: {
                this.edgeManagerContext = new EdgeManagerPluginContextImpl(null);
                if (this.conf.getBoolean("tez.am.one-to-one.routing.use.on-demand-routing", false)) {
                    this.edgeManager = new OneToOneEdgeManagerOnDemand(this.edgeManagerContext);
                    break;
                }
                this.edgeManager = new OneToOneEdgeManager(this.edgeManagerContext);
                break;
            }
            case BROADCAST: {
                this.edgeManagerContext = new EdgeManagerPluginContextImpl(null);
                this.edgeManager = new BroadcastEdgeManager(this.edgeManagerContext);
                break;
            }
            case SCATTER_GATHER: {
                this.edgeManagerContext = new EdgeManagerPluginContextImpl(null);
                this.edgeManager = new ScatterGatherEdgeManager(this.edgeManagerContext);
                break;
            }
            case CUSTOM: {
                if (this.edgeProperty.getEdgeManagerDescriptor() == null) break;
                UserPayload payload = null;
                if (this.edgeProperty.getEdgeManagerDescriptor().getUserPayload() != null) {
                    payload = this.edgeProperty.getEdgeManagerDescriptor().getUserPayload();
                }
                this.edgeManagerContext = new EdgeManagerPluginContextImpl(payload);
                String edgeManagerClassName = this.edgeProperty.getEdgeManagerDescriptor().getClassName();
                this.edgeManager = (EdgeManagerPlugin)ReflectionUtils.createClazzInstance((String)edgeManagerClassName, (Class[])new Class[]{EdgeManagerPluginContext.class}, (Object[])new Object[]{this.edgeManagerContext});
                break;
            }
            default: {
                String message = "Unknown edge data movement type: " + this.edgeProperty.getDataMovementType();
                throw new TezException(message);
            }
        }
    }

    public void initialize() throws AMUserCodeException {
        if (this.edgeManager != null) {
            try {
                this.edgeManager.initialize();
            }
            catch (Exception e) {
                throw new AMUserCodeException(AMUserCodeException.Source.EdgeManager, "Fail to initialize Edge," + this.getEdgeInfo(), e);
            }
        }
        this.destinationMetaInfo = new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, this.destinationVertex.getName(), this.sourceVertex.getName(), null);
    }

    public synchronized void setEdgeProperty(EdgeProperty newEdgeProperty) throws AMUserCodeException {
        this.edgeProperty = newEdgeProperty;
        boolean wasUnInitialized = this.edgeManager == null;
        try {
            this.createEdgeManager();
        }
        catch (TezException e) {
            throw new AMUserCodeException(AMUserCodeException.Source.EdgeManager, e);
        }
        this.initialize();
        if (wasUnInitialized) {
            this.sendEvent((Event)new VertexEventNullEdgeInitialized(this.sourceVertex.getVertexId(), this, this.destinationVertex));
            this.sendEvent((Event)new VertexEventNullEdgeInitialized(this.destinationVertex.getVertexId(), this, this.sourceVertex));
        }
    }

    @VisibleForTesting
    synchronized void setCustomEdgeManager(EdgeManagerPluginDescriptor descriptor) throws AMUserCodeException {
        EdgeProperty modifiedEdgeProperty = EdgeProperty.create((EdgeManagerPluginDescriptor)descriptor, (EdgeProperty.DataSourceType)this.edgeProperty.getDataSourceType(), (EdgeProperty.SchedulingType)this.edgeProperty.getSchedulingType(), (OutputDescriptor)this.edgeProperty.getEdgeSource(), (InputDescriptor)this.edgeProperty.getEdgeDestination());
        this.setEdgeProperty(modifiedEdgeProperty);
    }

    public synchronized void routingToBegin() throws AMUserCodeException {
        if (this.edgeManagerContext.getDestinationVertexNumTasks() == 0) {
            this.routingNeeded = false;
        } else if (this.edgeManagerContext.getDestinationVertexNumTasks() < 0) {
            throw new TezUncheckedException("Internal error. Not expected to route events to a destination until parallelism is determined sourceVertex=" + this.sourceVertex.getLogIdentifier() + " edgeManager=" + this.edgeManager.getClass().getName());
        }
        if (this.edgeManager instanceof EdgeManagerPluginOnDemand) {
            this.onDemandRouting = true;
            try {
                ((EdgeManagerPluginOnDemand)this.edgeManager).prepareForRouting();
            }
            catch (Exception e) {
                throw new AMUserCodeException(AMUserCodeException.Source.EdgeManager, "Fail to prepareForRouting " + this.getEdgeInfo(), e);
            }
        }
        LOG.info("Routing to begin for edge: " + this.getEdgeInfo() + ". EdgeProperty: " + this.edgeProperty + " onDemandRouting: " + this.hasOnDemandRouting());
    }

    public synchronized boolean hasOnDemandRouting() {
        return this.onDemandRouting;
    }

    public synchronized EdgeProperty getEdgeProperty() {
        return this.edgeProperty;
    }

    public EdgeManagerPlugin getEdgeManager() {
        return this.edgeManager;
    }

    public void setSourceVertex(Vertex sourceVertex) {
        if (this.sourceVertex != null && this.sourceVertex != sourceVertex) {
            throw new TezUncheckedException("Source vertex exists: " + sourceVertex.getLogIdentifier());
        }
        this.sourceVertex = sourceVertex;
    }

    public void setDestinationVertex(Vertex destinationVertex) {
        if (this.destinationVertex != null && this.destinationVertex != destinationVertex) {
            throw new TezUncheckedException("Destination vertex exists: " + destinationVertex.getLogIdentifier());
        }
        this.destinationVertex = destinationVertex;
    }

    public InputSpec getDestinationSpec(int destinationTaskIndex) throws AMUserCodeException {
        Preconditions.checkState((this.edgeManager != null ? 1 : 0) != 0, (Object)"Edge Manager must be initialized by this time");
        try {
            int physicalInputCount = this.edgeManager.getNumDestinationTaskPhysicalInputs(destinationTaskIndex);
            Preconditions.checkArgument((physicalInputCount >= 0 ? 1 : 0) != 0, (Object)("PhysicalInputCount should not be negative, physicalInputCount=" + physicalInputCount));
            return new InputSpec(this.sourceVertex.getName(), this.edgeProperty.getEdgeDestination(), physicalInputCount);
        }
        catch (Exception e) {
            throw new AMUserCodeException(AMUserCodeException.Source.EdgeManager, "Fail to getDestinationSpec, destinationTaskIndex=" + destinationTaskIndex + ", " + this.getEdgeInfo(), e);
        }
    }

    public OutputSpec getSourceSpec(int sourceTaskIndex) throws AMUserCodeException {
        Preconditions.checkState((this.edgeManager != null ? 1 : 0) != 0, (Object)"Edge Manager must be initialized by this time");
        try {
            int physicalOutputCount = this.edgeManager.getNumSourceTaskPhysicalOutputs(sourceTaskIndex);
            Preconditions.checkArgument((physicalOutputCount >= 0 ? 1 : 0) != 0, (Object)("PhysicalOutputCount should not be negative,physicalOutputCount=" + physicalOutputCount));
            return new OutputSpec(this.destinationVertex.getName(), this.edgeProperty.getEdgeSource(), physicalOutputCount);
        }
        catch (Exception e) {
            throw new AMUserCodeException(AMUserCodeException.Source.EdgeManager, "Fail to getSourceSpec, sourceTaskIndex=" + sourceTaskIndex + ", " + this.getEdgeInfo(), e);
        }
    }

    public void startEventBuffering() {
        this.bufferEvents.set(true);
    }

    public void stopEventBuffering() throws AMUserCodeException {
        this.bufferEvents.set(false);
        for (TezEvent event : this.destinationEventBuffer) {
            this.sendTezEventToDestinationTasks(event);
        }
        this.destinationEventBuffer.clear();
        for (TezEvent event : this.sourceEventBuffer) {
            this.sendTezEventToSourceTasks(event);
        }
        this.sourceEventBuffer.clear();
    }

    public void sendTezEventToSourceTasks(TezEvent tezEvent) throws AMUserCodeException {
        Preconditions.checkState((this.edgeManager != null ? 1 : 0) != 0, (Object)"Edge Manager must be initialized by this time");
        if (!this.bufferEvents.get()) {
            switch (tezEvent.getEventType()) {
                case INPUT_READ_ERROR_EVENT: {
                    int numConsumers;
                    int srcTaskIndex;
                    InputReadErrorEvent event = (InputReadErrorEvent)tezEvent.getEvent();
                    TezTaskAttemptID destAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
                    int destTaskIndex = destAttemptId.getTaskID().getId();
                    try {
                        srcTaskIndex = this.onDemandRouting ? ((EdgeManagerPluginOnDemand)this.edgeManager).routeInputErrorEventToSource(destTaskIndex, event.getIndex()) : this.edgeManager.routeInputErrorEventToSource(event, destTaskIndex, event.getIndex());
                        Preconditions.checkArgument((srcTaskIndex >= 0 ? 1 : 0) != 0, (Object)("SourceTaskIndex should not be negative,srcTaskIndex=" + srcTaskIndex));
                        numConsumers = this.edgeManager.getNumDestinationConsumerTasks(srcTaskIndex);
                        Preconditions.checkArgument((numConsumers > 0 ? 1 : 0) != 0, (Object)("ConsumerTaskNum must be positive,numConsumers=" + numConsumers));
                    }
                    catch (Exception e) {
                        throw new AMUserCodeException(AMUserCodeException.Source.EdgeManager, "Fail to sendTezEventToSourceTasks, TezEvent:" + tezEvent.getEvent() + "sourceInfo:" + tezEvent.getSourceInfo() + "destinationInfo:" + tezEvent.getDestinationInfo() + ", " + this.getEdgeInfo(), e);
                    }
                    Task srcTask = this.sourceVertex.getTask(srcTaskIndex);
                    if (srcTask == null) {
                        throw new TezUncheckedException("Unexpected null task. sourceVertex=" + this.sourceVertex.getLogIdentifier() + " srcIndex = " + srcTaskIndex + " destAttemptId=" + destAttemptId + " destIndex=" + destTaskIndex + " edgeManager=" + this.edgeManager.getClass().getName());
                    }
                    TezTaskID srcTaskId = srcTask.getTaskId();
                    int taskAttemptIndex = event.getVersion();
                    TezTaskAttemptID srcTaskAttemptId = TezTaskAttemptID.getInstance((TezTaskID)srcTaskId, (int)taskAttemptIndex);
                    this.sendEvent((Event)new TaskAttemptEventOutputFailed(srcTaskAttemptId, tezEvent, numConsumers));
                    break;
                }
                default: {
                    throw new TezUncheckedException("Unhandled tez event type: " + tezEvent.getEventType());
                }
            }
        } else {
            this.sourceEventBuffer.add(tezEvent);
        }
    }

    private void handleCompositeDataMovementEvent(TezEvent tezEvent) throws AMUserCodeException {
        CompositeDataMovementEvent compEvent = (CompositeDataMovementEvent)tezEvent.getEvent();
        EventMetaData srcInfo = tezEvent.getSourceInfo();
        for (DataMovementEvent dmEvent : compEvent.getEvents()) {
            TezEvent newEvent = new TezEvent((org.apache.tez.runtime.api.Event)dmEvent, srcInfo, tezEvent.getEventReceivedTime());
            this.sendTezEventToDestinationTasks(newEvent);
        }
    }

    void sendDmEventOrIfEventToTasks(TezEvent tezEvent, int srcTaskIndex, boolean isDataMovementEvent, Map<Integer, List<Integer>> taskAndInputIndices) {
        Preconditions.checkState((this.edgeManager != null ? 1 : 0) != 0, (Object)"Edge Manager must be initialized by this time");
        org.apache.tez.runtime.api.Event event = tezEvent.getEvent();
        HashMap inputIndicesWithEvents = Maps.newHashMap();
        for (Map.Entry<Integer, List<Integer>> entry : taskAndInputIndices.entrySet()) {
            int destTaskIndex = entry.getKey();
            List<Integer> inputIndices = entry.getValue();
            for (int i = 0; i < inputIndices.size(); ++i) {
                Task destTask;
                Integer inputIndex = inputIndices.get(i);
                TezEvent tezEventToSend = (TezEvent)inputIndicesWithEvents.get(inputIndex);
                if (tezEventToSend == null) {
                    InputFailedEvent e;
                    if (isDataMovementEvent) {
                        DataMovementEvent dmEvent = (DataMovementEvent)event;
                        e = DataMovementEvent.create((int)dmEvent.getSourceIndex(), (int)inputIndex, (int)dmEvent.getVersion(), (ByteBuffer)dmEvent.getUserPayload());
                    } else {
                        InputFailedEvent ifEvent = (InputFailedEvent)event;
                        e = InputFailedEvent.create((int)inputIndex, (int)ifEvent.getVersion());
                    }
                    tezEventToSend = new TezEvent((org.apache.tez.runtime.api.Event)e, tezEvent.getSourceInfo(), tezEvent.getEventReceivedTime());
                    tezEventToSend.setDestinationInfo(this.destinationMetaInfo);
                    inputIndicesWithEvents.put(inputIndex, tezEventToSend);
                }
                if ((destTask = this.destinationVertex.getTask(destTaskIndex)) == null) {
                    throw new TezUncheckedException("Unexpected null task. sourceVertex=" + this.sourceVertex.getLogIdentifier() + " srcTaskIndex = " + srcTaskIndex + " destVertex=" + this.destinationVertex.getLogIdentifier() + " destTaskIndex=" + destTaskIndex + " destNumTasks=" + this.destinationVertex.getTotalTasks() + " edgeManager=" + this.edgeManager.getClass().getName());
                }
                this.sendEventToTask(destTask, tezEventToSend);
            }
        }
    }

    public void sendTezEventToDestinationTasks(TezEvent tezEvent) throws AMUserCodeException {
        block13: {
            block12: {
                if (this.bufferEvents.get()) break block12;
                boolean isDataMovementEvent = true;
                switch (tezEvent.getEventType()) {
                    case COMPOSITE_DATA_MOVEMENT_EVENT: {
                        this.handleCompositeDataMovementEvent(tezEvent);
                        break;
                    }
                    case INPUT_FAILED_EVENT: 
                    case DATA_MOVEMENT_EVENT: {
                        if (tezEvent.getEventType().equals((Object)EventType.INPUT_FAILED_EVENT)) {
                            isDataMovementEvent = false;
                        }
                        HashMap destTaskAndInputIndices = Maps.newHashMap();
                        TezTaskAttemptID srcAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
                        int srcTaskIndex = srcAttemptId.getTaskID().getId();
                        boolean routingRequired = this.routingNeeded;
                        if (routingRequired) {
                            try {
                                if (isDataMovementEvent) {
                                    DataMovementEvent dmEvent = (DataMovementEvent)tezEvent.getEvent();
                                    this.edgeManager.routeDataMovementEventToDestination(dmEvent, srcTaskIndex, dmEvent.getSourceIndex(), (Map)destTaskAndInputIndices);
                                }
                                this.edgeManager.routeInputSourceTaskFailedEventToDestination(srcTaskIndex, (Map)destTaskAndInputIndices);
                            }
                            catch (Exception e) {
                                throw new AMUserCodeException(AMUserCodeException.Source.EdgeManager, "Fail to sendTezEventToDestinationTasks, event:" + tezEvent.getEvent() + ", sourceInfo:" + tezEvent.getSourceInfo() + ", destinationInfo:" + tezEvent.getDestinationInfo() + ", " + this.getEdgeInfo(), e);
                            }
                        } else {
                            LOG.info("Not routing events since destination vertex has 0 tasks" + this.generateCommonDebugString(srcTaskIndex, tezEvent));
                        }
                        if (!destTaskAndInputIndices.isEmpty()) {
                            this.sendDmEventOrIfEventToTasks(tezEvent, srcTaskIndex, isDataMovementEvent, destTaskAndInputIndices);
                            break;
                        }
                        if (routingRequired) {
                            throw new TezUncheckedException("Event must be routed." + this.generateCommonDebugString(srcTaskIndex, tezEvent));
                        }
                        break block13;
                    }
                    default: {
                        throw new TezUncheckedException("Unhandled tez event type: " + tezEvent.getEventType());
                    }
                }
                break block13;
            }
            this.destinationEventBuffer.add(tezEvent);
        }
    }

    public PendingEventRouteMetadata removePendingEvents(TezTaskAttemptID attemptID) {
        return (PendingEventRouteMetadata)this.pendingEvents.remove(attemptID);
    }

    public boolean maybeAddTezEventForDestinationTask(TezEvent tezEvent, TezTaskAttemptID attemptID, int srcTaskIndex, List<TezEvent> listToAdd, int listMaxSize, PendingEventRouteMetadata pendingRoutes) throws AMUserCodeException {
        if (!this.routingNeeded) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Not routing events since destination vertex has 0 tasks" + this.generateCommonDebugString(srcTaskIndex, tezEvent));
            }
            return true;
        }
        try {
            EdgeManagerPluginOnDemand edgeManagerOnDemand = (EdgeManagerPluginOnDemand)this.edgeManager;
            int taskIndex = attemptID.getTaskID().getId();
            switch (tezEvent.getEventType()) {
                case COMPOSITE_DATA_MOVEMENT_EVENT: {
                    int numEventsDone;
                    EdgeManagerPluginOnDemand.EventRouteMetadata routeMeta;
                    CompositeDataMovementEvent compEvent = (CompositeDataMovementEvent)tezEvent.getEvent();
                    if (pendingRoutes != null) {
                        routeMeta = pendingRoutes.getRouteMeta();
                        numEventsDone = pendingRoutes.getNumEventsRouted();
                    } else {
                        routeMeta = edgeManagerOnDemand.routeCompositeDataMovementEventToDestination(srcTaskIndex, taskIndex);
                        numEventsDone = 0;
                    }
                    if (routeMeta != null) {
                        int listSize = listToAdd.size();
                        int numEvents = routeMeta.getNumEvents();
                        int[] sourceIndices = routeMeta.getSourceIndices();
                        int[] targetIndices = routeMeta.getTargetIndices();
                        while (numEventsDone < numEvents && listSize++ < listMaxSize) {
                            DataMovementEvent e = compEvent.expand(sourceIndices[numEventsDone], targetIndices[numEventsDone]);
                            ++numEventsDone;
                            TezEvent tezEventToSend = new TezEvent((org.apache.tez.runtime.api.Event)e, tezEvent.getSourceInfo(), tezEvent.getEventReceivedTime());
                            tezEventToSend.setDestinationInfo(this.destinationMetaInfo);
                            listToAdd.add(tezEventToSend);
                        }
                        if (numEventsDone < numEvents) {
                            this.pendingEvents.put(attemptID, new PendingEventRouteMetadata(routeMeta, tezEvent, numEventsDone));
                            return false;
                        }
                    }
                    break;
                }
                case INPUT_FAILED_EVENT: {
                    int numEventsDone;
                    EdgeManagerPluginOnDemand.EventRouteMetadata routeMeta;
                    InputFailedEvent ifEvent = (InputFailedEvent)tezEvent.getEvent();
                    if (pendingRoutes != null) {
                        routeMeta = pendingRoutes.getRouteMeta();
                        numEventsDone = pendingRoutes.getNumEventsRouted();
                    } else {
                        routeMeta = edgeManagerOnDemand.routeInputSourceTaskFailedEventToDestination(srcTaskIndex, taskIndex);
                        numEventsDone = 0;
                    }
                    if (routeMeta != null) {
                        int listSize = listToAdd.size();
                        int numEvents = routeMeta.getNumEvents();
                        int[] targetIndices = routeMeta.getTargetIndices();
                        while (numEventsDone < numEvents && listSize++ < listMaxSize) {
                            InputFailedEvent e = ifEvent.makeCopy(targetIndices[numEventsDone]);
                            ++numEventsDone;
                            TezEvent tezEventToSend = new TezEvent((org.apache.tez.runtime.api.Event)e, tezEvent.getSourceInfo(), tezEvent.getEventReceivedTime());
                            tezEventToSend.setDestinationInfo(this.destinationMetaInfo);
                            listToAdd.add(tezEventToSend);
                        }
                        if (numEventsDone < numEvents) {
                            this.pendingEvents.put(attemptID, new PendingEventRouteMetadata(routeMeta, tezEvent, numEventsDone));
                            return false;
                        }
                    }
                    break;
                }
                case DATA_MOVEMENT_EVENT: {
                    int numEventsDone;
                    EdgeManagerPluginOnDemand.EventRouteMetadata routeMeta;
                    DataMovementEvent dmEvent = (DataMovementEvent)tezEvent.getEvent();
                    if (pendingRoutes != null) {
                        routeMeta = pendingRoutes.getRouteMeta();
                        numEventsDone = pendingRoutes.getNumEventsRouted();
                    } else {
                        routeMeta = edgeManagerOnDemand.routeDataMovementEventToDestination(srcTaskIndex, dmEvent.getSourceIndex(), taskIndex);
                        numEventsDone = 0;
                    }
                    if (routeMeta != null) {
                        int listSize = listToAdd.size();
                        int numEvents = routeMeta.getNumEvents();
                        int[] targetIndices = routeMeta.getTargetIndices();
                        while (numEventsDone < numEvents && listSize++ < listMaxSize) {
                            DataMovementEvent e = dmEvent.makeCopy(targetIndices[numEventsDone]);
                            ++numEventsDone;
                            TezEvent tezEventToSend = new TezEvent((org.apache.tez.runtime.api.Event)e, tezEvent.getSourceInfo(), tezEvent.getEventReceivedTime());
                            tezEventToSend.setDestinationInfo(this.destinationMetaInfo);
                            listToAdd.add(tezEventToSend);
                        }
                        if (numEventsDone < numEvents) {
                            this.pendingEvents.put(attemptID, new PendingEventRouteMetadata(routeMeta, tezEvent, numEventsDone));
                            return false;
                        }
                    }
                    break;
                }
                default: {
                    throw new TezUncheckedException("Unhandled tez event type: " + tezEvent.getEventType());
                }
            }
        }
        catch (Exception e) {
            throw new AMUserCodeException(AMUserCodeException.Source.EdgeManager, "Fail to maybeAddTezEventForDestinationTask, event:" + tezEvent.getEvent() + ", sourceInfo:" + tezEvent.getSourceInfo() + ", destinationInfo:" + tezEvent.getDestinationInfo() + ", " + this.getEdgeInfo(), e);
        }
        return true;
    }

    private void sendEventToTask(Task task, TezEvent tezEvent) {
        task.registerTezEvent(tezEvent);
    }

    private void sendEvent(Event event) {
        this.eventHandler.handle(event);
    }

    public String getSourceVertexName() {
        return this.sourceVertex.getName();
    }

    public String getDestinationVertexName() {
        return this.destinationVertex.getName();
    }

    private String generateCommonDebugString(int srcTaskIndex, TezEvent tezEvent) {
        return " sourceVertex=" + this.sourceVertex.getLogIdentifier() + " srcIndex = " + srcTaskIndex + " destAttemptId=" + this.destinationVertex.getLogIdentifier() + " edgeManager=" + this.edgeManager.getClass().getName() + " Event type=" + tezEvent.getEventType();
    }

    private String getEdgeInfo() {
        return "EdgeInfo: sourceVertexName=" + this.getSourceVertexName() + ", destinationVertexName=" + this.getDestinationVertexName();
    }

    static class PendingEventRouteMetadata {
        private final EdgeManagerPluginOnDemand.EventRouteMetadata routeMeta;
        private final TezEvent event;
        private int numEventsRouted;

        public PendingEventRouteMetadata(EdgeManagerPluginOnDemand.EventRouteMetadata routeMeta, TezEvent event, int numEventsRouted) {
            this.routeMeta = routeMeta;
            this.event = event;
            this.numEventsRouted = numEventsRouted;
        }

        public EdgeManagerPluginOnDemand.EventRouteMetadata getRouteMeta() {
            return this.routeMeta;
        }

        public TezEvent getTezEvent() {
            return this.event;
        }

        public int getNumEventsRouted() {
            return this.numEventsRouted;
        }
    }

    class EdgeManagerPluginContextImpl
    implements EdgeManagerPluginContext {
        private final UserPayload userPayload;

        EdgeManagerPluginContextImpl(UserPayload userPayload) {
            this.userPayload = userPayload;
        }

        public UserPayload getUserPayload() {
            return this.userPayload;
        }

        public String getSourceVertexName() {
            return Edge.this.sourceVertex.getName();
        }

        public String getDestinationVertexName() {
            return Edge.this.destinationVertex.getName();
        }

        public int getSourceVertexNumTasks() {
            return Edge.this.sourceVertex.getTotalTasks();
        }

        public int getDestinationVertexNumTasks() {
            return Edge.this.destinationVertex.getTotalTasks();
        }
    }
}

