/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.history.logging.ats;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.logging.HistoryLoggingService;
import org.apache.tez.dag.history.logging.ats.HistoryEventTimelineConversion;
import org.apache.tez.dag.records.TezDAGID;

public class ATSHistoryLoggingService
extends HistoryLoggingService {
    private static final Log LOG = LogFactory.getLog(ATSHistoryLoggingService.class);
    private LinkedBlockingQueue<DAGHistoryEvent> eventQueue = new LinkedBlockingQueue();
    private Thread eventHandlingThread;
    private AtomicBoolean stopped = new AtomicBoolean(false);
    private int eventCounter = 0;
    private int eventsProcessed = 0;
    private final Object lock = new Object();
    @VisibleForTesting
    TimelineClient timelineClient;
    private HashSet<TezDAGID> skippedDAGs = new HashSet();
    private Map<TezDAGID, String> dagDomainIdMap = new HashMap<TezDAGID, String>();
    private long maxTimeToWaitOnShutdown;
    private boolean waitForeverOnShutdown = false;
    private int maxEventsPerBatch;
    private long maxPollingTimeMillis;
    private String sessionDomainId;
    private static final String atsHistoryACLManagerClassName = "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
    private HistoryACLPolicyManager historyACLPolicyManager;

    public ATSHistoryLoggingService() {
        super(ATSHistoryLoggingService.class.getName());
    }

    public void serviceInit(Configuration conf) throws Exception {
        LOG.info((Object)"Initializing ATSService");
        this.timelineClient = TimelineClient.createTimelineClient();
        this.timelineClient.init(conf);
        this.maxTimeToWaitOnShutdown = conf.getLong("tez.yarn.ats.event.flush.timeout.millis", -1L);
        this.maxEventsPerBatch = conf.getInt("tez.yarn.ats.max.events.per.batch", 5);
        this.maxPollingTimeMillis = conf.getInt("tez.yarn.ats.max.polling.time.per.event.millis", 10);
        if (this.maxTimeToWaitOnShutdown < 0L) {
            this.waitForeverOnShutdown = true;
        }
        this.sessionDomainId = conf.get("tez.yarn.ats.acl.session.domain.id");
        LOG.info((Object)"Using org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager to manage Timeline ACLs");
        try {
            this.historyACLPolicyManager = (HistoryACLPolicyManager)ReflectionUtils.createClazzInstance((String)atsHistoryACLManagerClassName);
            this.historyACLPolicyManager.setConf(conf);
        }
        catch (TezUncheckedException e) {
            LOG.warn((Object)"Could not instantiate object for org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager. ACLs cannot be enforced correctly for history data in Timeline", (Throwable)e);
            if (!conf.getBoolean("tez.allow.disabled.timeline-domains", false)) {
                throw e;
            }
            this.historyACLPolicyManager = null;
        }
    }

    public void serviceStart() {
        LOG.info((Object)"Starting ATSService");
        this.timelineClient.start();
        this.eventHandlingThread = new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                LinkedList events = new LinkedList();
                boolean interrupted = false;
                while (!(ATSHistoryLoggingService.this.stopped.get() || Thread.currentThread().isInterrupted() || interrupted)) {
                    if (ATSHistoryLoggingService.this.eventCounter != 0 && ATSHistoryLoggingService.this.eventCounter % 1000 == 0) {
                        if (ATSHistoryLoggingService.this.eventsProcessed != 0 && !events.isEmpty()) {
                            LOG.info((Object)("Event queue stats, eventsProcessedSinceLastUpdate=" + ATSHistoryLoggingService.this.eventsProcessed + ", eventQueueSize=" + ATSHistoryLoggingService.this.eventQueue.size()));
                        }
                        ATSHistoryLoggingService.this.eventCounter = 0;
                        ATSHistoryLoggingService.this.eventsProcessed = 0;
                    } else {
                        ++ATSHistoryLoggingService.this.eventCounter;
                    }
                    Object object = ATSHistoryLoggingService.this.lock;
                    synchronized (object) {
                        try {
                            ATSHistoryLoggingService.this.getEventBatch(events);
                        }
                        catch (InterruptedException e) {
                            interrupted = true;
                        }
                        if (events.isEmpty()) {
                            continue;
                        }
                        ATSHistoryLoggingService.this.eventsProcessed += events.size();
                        try {
                            ATSHistoryLoggingService.this.handleEvents(events);
                        }
                        catch (Exception e) {
                            LOG.warn((Object)"Error handling events", (Throwable)e);
                        }
                    }
                }
            }
        }, "HistoryEventHandlingThread");
        this.eventHandlingThread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void serviceStop() {
        LOG.info((Object)("Stopping ATSService, eventQueueBacklog=" + this.eventQueue.size()));
        this.stopped.set(true);
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
        }
        Object object = this.lock;
        synchronized (object) {
            if (!this.eventQueue.isEmpty()) {
                LOG.warn((Object)("ATSService being stopped, eventQueueBacklog=" + this.eventQueue.size() + ", maxTimeLeftToFlush=" + this.maxTimeToWaitOnShutdown + ", waitForever=" + this.waitForeverOnShutdown));
                long startTime = this.appContext.getClock().getTime();
                long endTime = startTime + this.maxTimeToWaitOnShutdown;
                LinkedList<DAGHistoryEvent> events = new LinkedList<DAGHistoryEvent>();
                while (this.waitForeverOnShutdown || endTime >= this.appContext.getClock().getTime()) {
                    try {
                        this.getEventBatch(events);
                    }
                    catch (InterruptedException e) {
                        LOG.info((Object)("ATSService interrupted while shutting down. Exiting. EventQueueBacklog=" + this.eventQueue.size()));
                    }
                    if (events.isEmpty()) {
                        LOG.info((Object)"Event queue empty, stopping ATS Service");
                        break;
                    }
                    try {
                        this.handleEvents(events);
                    }
                    catch (Exception e) {
                        LOG.warn((Object)"Error handling event", (Throwable)e);
                        break;
                    }
                }
            }
        }
        if (!this.eventQueue.isEmpty()) {
            LOG.warn((Object)("Did not finish flushing eventQueue before stopping ATSService, eventQueueBacklog=" + this.eventQueue.size()));
        }
        this.timelineClient.stop();
    }

    private void getEventBatch(List<DAGHistoryEvent> events) throws InterruptedException {
        DAGHistoryEvent event;
        events.clear();
        int counter = 0;
        while (counter < this.maxEventsPerBatch && (event = this.eventQueue.poll(this.maxPollingTimeMillis, TimeUnit.MILLISECONDS)) != null) {
            if (!this.isValidEvent(event)) continue;
            ++counter;
            events.add(event);
            if (!event.getHistoryEvent().getEventType().equals((Object)HistoryEventType.DAG_SUBMITTED)) continue;
            break;
        }
    }

    public void handle(DAGHistoryEvent event) {
        this.eventQueue.add(event);
    }

    private boolean isValidEvent(DAGHistoryEvent event) {
        HistoryEventType eventType = event.getHistoryEvent().getEventType();
        TezDAGID dagId = event.getDagID();
        if (eventType.equals((Object)HistoryEventType.DAG_SUBMITTED)) {
            String dagDomainId;
            DAGSubmittedEvent dagSubmittedEvent = (DAGSubmittedEvent)event.getHistoryEvent();
            String dagName = dagSubmittedEvent.getDAGName();
            if (dagName != null && dagName.startsWith("TezPreWarmDAG")) {
                this.skippedDAGs.add(dagId);
                return false;
            }
            if (this.historyACLPolicyManager != null && (dagDomainId = dagSubmittedEvent.getConf().get("tez.yarn.ats.acl.dag.domain.id")) != null) {
                this.dagDomainIdMap.put(dagId, dagDomainId);
            }
        }
        if (eventType.equals((Object)HistoryEventType.DAG_FINISHED) && this.skippedDAGs.remove(dagId)) {
            return false;
        }
        return dagId == null || !this.skippedDAGs.contains(dagId);
    }

    private void handleEvents(List<DAGHistoryEvent> events) {
        TimelineEntity[] entities = new TimelineEntity[events.size()];
        for (int i = 0; i < events.size(); ++i) {
            DAGHistoryEvent event = events.get(i);
            String domainId = this.sessionDomainId;
            TezDAGID dagId = event.getDagID();
            if (this.historyACLPolicyManager != null && dagId != null && this.dagDomainIdMap.containsKey(dagId)) {
                domainId = this.dagDomainIdMap.get(dagId);
            }
            entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent());
            if (this.historyACLPolicyManager == null || domainId == null || domainId.isEmpty()) continue;
            this.historyACLPolicyManager.updateTimelineEntityDomain((Object)entities[i], domainId);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Sending event batch to Timeline, batchSize=" + events.size()));
        }
        try {
            TimelinePutResponse response = this.timelineClient.putEntities(entities);
            if (response != null && !response.getErrors().isEmpty()) {
                int count = response.getErrors().size();
                for (int i = 0; i < count; ++i) {
                    TimelinePutResponse.TimelinePutError err = (TimelinePutResponse.TimelinePutError)response.getErrors().get(i);
                    if (err.getErrorCode() == 0) continue;
                    LOG.warn((Object)("Could not post history event to ATS, atsPutError=" + err.getErrorCode() + ", entityId=" + entities[i].getEntityId() + ", eventType=" + events.get(i).getHistoryEvent().getEventType()));
                }
            }
        }
        catch (Exception e) {
            LOG.warn((Object)"Could not handle history events", (Throwable)e);
        }
    }
}

