/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.rest.service;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.AclEntity;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.cube.model.CubeBuildTypeEnum;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.common.JobInfoConverter;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.SchedulerFactory;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.constant.JobTimeFilterEnum;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.service.AccessService;
import org.apache.kylin.rest.service.BasicService;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.SourceFactory;
import org.apache.kylin.source.SourcePartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Component;

@EnableAspectJAutoProxy(proxyTargetClass=true)
@Component(value="jobService")
public class JobService
extends BasicService
implements InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(JobService.class);
    private JobLock jobLock;
    @Autowired
    private AccessService accessService;

    public void afterPropertiesSet() throws Exception {
        String timeZone = this.getConfig().getTimeZone();
        TimeZone tzone = TimeZone.getTimeZone(timeZone);
        TimeZone.setDefault(tzone);
        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        final Scheduler scheduler = SchedulerFactory.scheduler((int)kylinConfig.getSchedulerType());
        this.jobLock = (JobLock)ClassUtil.newInstance((String)kylinConfig.getJobControllerLock());
        new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    scheduler.init(new JobEngineConfig(kylinConfig), JobService.this.jobLock);
                    if (!scheduler.hasStarted()) {
                        logger.info("scheduler has not been started");
                    }
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    scheduler.shutdown();
                }
                catch (SchedulerException e) {
                    logger.error("error occurred to shutdown scheduler", (Throwable)e);
                }
            }
        }));
    }

    private Set<ExecutableState> convertStatusEnumToStates(List<JobStatusEnum> statusList) {
        AbstractSet states;
        if (statusList == null || statusList.isEmpty()) {
            states = EnumSet.allOf(ExecutableState.class);
        } else {
            states = Sets.newHashSet();
            for (JobStatusEnum status : statusList) {
                states.add((ExecutableState)this.parseToExecutableState(status));
            }
        }
        return states;
    }

    private long getTimeStartInMillis(Calendar calendar, JobTimeFilterEnum timeFilter) {
        switch (timeFilter) {
            case LAST_ONE_DAY: {
                calendar.add(5, -1);
                return calendar.getTimeInMillis();
            }
            case LAST_ONE_WEEK: {
                calendar.add(4, -1);
                return calendar.getTimeInMillis();
            }
            case LAST_ONE_MONTH: {
                calendar.add(2, -1);
                return calendar.getTimeInMillis();
            }
            case LAST_ONE_YEAR: {
                calendar.add(1, -1);
                return calendar.getTimeInMillis();
            }
            case ALL: {
                return 0L;
            }
        }
        throw new RuntimeException("illegal timeFilter for job history:" + timeFilter);
    }

    private ExecutableState parseToExecutableState(JobStatusEnum status) {
        switch (status) {
            case DISCARDED: {
                return ExecutableState.DISCARDED;
            }
            case ERROR: {
                return ExecutableState.ERROR;
            }
            case FINISHED: {
                return ExecutableState.SUCCEED;
            }
            case NEW: {
                return ExecutableState.READY;
            }
            case PENDING: {
                return ExecutableState.READY;
            }
            case RUNNING: {
                return ExecutableState.RUNNING;
            }
            case STOPPED: {
                return ExecutableState.STOPPED;
            }
        }
        throw new RuntimeException("illegal status:" + status);
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN') or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
    public JobInstance submitJob(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException, JobException {
        DefaultChainedExecutable job;
        if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
            throw new BadRequestException("Broken cube " + cube.getName() + " can't be built");
        }
        this.checkCubeDescSignature(cube);
        CubeSegment newSeg = null;
        try {
            if (buildType == CubeBuildTypeEnum.BUILD) {
                ISource source = SourceFactory.tableSource((ISourceAware)cube);
                SourcePartition sourcePartition = new SourcePartition(startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
                sourcePartition = source.parsePartitionBeforeBuild((IBuildable)cube, sourcePartition);
                newSeg = this.getCubeManager().appendSegment(cube, sourcePartition);
                job = EngineFactory.createBatchCubingJob((CubeSegment)newSeg, (String)submitter);
            } else if (buildType == CubeBuildTypeEnum.MERGE) {
                newSeg = this.getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force);
                job = EngineFactory.createBatchMergeJob((CubeSegment)newSeg, (String)submitter);
            } else if (buildType == CubeBuildTypeEnum.REFRESH) {
                newSeg = this.getCubeManager().refreshSegment(cube, startDate, endDate, startOffset, endOffset);
                job = EngineFactory.createBatchCubingJob((CubeSegment)newSeg, (String)submitter);
            } else {
                throw new JobException("invalid build type:" + buildType);
            }
            this.getExecutableManager().addJob((AbstractExecutable)job);
        }
        catch (Exception e) {
            if (newSeg != null) {
                logger.error("Job submission might failed for NEW segment {}, will clean the NEW segment from cube", (Object)newSeg.getName());
                try {
                    CubeUpdate cubeBuilder = new CubeUpdate(cube);
                    cubeBuilder.setToRemoveSegs(new CubeSegment[]{newSeg});
                    this.getCubeManager().updateCube(cubeBuilder);
                }
                catch (Exception ee) {
                    logger.error("Clean New segment failed, ignoring it", (Throwable)e);
                }
            }
            throw e;
        }
        JobInstance jobInstance = this.getSingleJobInstance((AbstractExecutable)job);
        this.accessService.init((AclEntity)jobInstance, null);
        this.accessService.inherit((AclEntity)jobInstance, (AclEntity)cube);
        return jobInstance;
    }

    private void checkCubeDescSignature(CubeInstance cube) {
        if (!cube.getDescriptor().checkSignature()) {
            throw new IllegalStateException("Inconsistent cube desc signature for " + cube.getDescriptor() + ", if it's right after a upgrade, please try 'Edit CubeDesc' to delete the 'signature' field. Or use 'bin/metastore.sh refresh-cube-signature' to batch refresh all cubes' signatures, then reload metadata to take effect");
        }
    }

    public JobInstance getJobInstance(String uuid) throws IOException, JobException {
        return this.getSingleJobInstance(this.getExecutableManager().getJob(uuid));
    }

    public Output getOutput(String id) {
        return this.getExecutableManager().getOutput(id);
    }

    private JobInstance getSingleJobInstance(AbstractExecutable job) {
        if (job == null) {
            return null;
        }
        Preconditions.checkState((boolean)(job instanceof CubingJob), (Object)("illegal job type, id:" + job.getId()));
        CubingJob cubeJob = (CubingJob)job;
        JobInstance result = new JobInstance();
        result.setName(job.getName());
        result.setRelatedCube(CubingExecutableUtil.getCubeName((Map)cubeJob.getParams()));
        result.setRelatedSegment(CubingExecutableUtil.getSegmentId((Map)cubeJob.getParams()));
        result.setLastModified(cubeJob.getLastModified());
        result.setSubmitter(cubeJob.getSubmitter());
        result.setUuid(cubeJob.getId());
        result.setType(CubeBuildTypeEnum.BUILD);
        result.setStatus(JobInfoConverter.parseToJobStatus((ExecutableState)job.getStatus()));
        result.setMrWaiting(cubeJob.getMapReduceWaitTime() / 1000L);
        result.setDuration(cubeJob.getDuration() / 1000L);
        for (int i = 0; i < cubeJob.getTasks().size(); ++i) {
            AbstractExecutable task = (AbstractExecutable)cubeJob.getTasks().get(i);
            result.addStep(JobInfoConverter.parseToJobStep((AbstractExecutable)task, (int)i, (Output)this.getExecutableManager().getOutput(task.getId())));
        }
        return result;
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN') or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
    public void resumeJob(JobInstance job) throws IOException, JobException {
        this.getExecutableManager().resumeJob(job.getId());
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN') or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
    public void rollbackJob(JobInstance job, String stepId) throws IOException, JobException {
        this.getExecutableManager().rollbackJob(job.getId(), stepId);
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN') or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
    public JobInstance cancelJob(JobInstance job) throws IOException, JobException {
        if (null == job.getRelatedCube() || null == this.getCubeManager().getCube(job.getRelatedCube())) {
            this.getExecutableManager().discardJob(job.getId());
            return job;
        }
        CubeInstance cubeInstance = this.getCubeManager().getCube(job.getRelatedCube());
        String segmentIds = job.getRelatedSegment();
        for (String segmentId : StringUtils.split((String)segmentIds)) {
            CubeSegment segment = cubeInstance.getSegmentById(segmentId);
            if (segment == null || segment.getStatus() != SegmentStatusEnum.NEW && segment.getDateRangeEnd() != 0L) continue;
            CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
            cubeBuilder.setToRemoveSegs(new CubeSegment[]{segment});
            this.getCubeManager().updateCube(cubeBuilder);
        }
        this.getExecutableManager().discardJob(job.getId());
        return job;
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN') or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
    public JobInstance pauseJob(JobInstance job) throws IOException, JobException {
        this.getExecutableManager().pauseJob(job.getId());
        return job;
    }

    public List<JobInstance> searchJobs(String cubeNameSubstring, String projectName, List<JobStatusEnum> statusList, Integer limitValue, Integer offsetValue, JobTimeFilterEnum timeFilter) throws IOException, JobException {
        Integer limit = null == limitValue ? 30 : limitValue;
        Integer offset = null == offsetValue ? 0 : offsetValue;
        List<JobInstance> jobs = this.searchJobs(cubeNameSubstring, projectName, statusList, timeFilter);
        Collections.sort(jobs);
        if (jobs.size() <= offset) {
            return Collections.emptyList();
        }
        if (jobs.size() - offset < limit) {
            return jobs.subList(offset, jobs.size());
        }
        return jobs.subList(offset, offset + limit);
    }

    private List<JobInstance> searchJobs(String cubeNameSubstring, String projectName, List<JobStatusEnum> statusList, JobTimeFilterEnum timeFilter) {
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(new Date());
        long timeStartInMillis = this.getTimeStartInMillis(calendar, timeFilter);
        long timeEndInMillis = Long.MAX_VALUE;
        Set<ExecutableState> states = this.convertStatusEnumToStates(statusList);
        final Map allOutputs = this.getExecutableManager().getAllOutputs(timeStartInMillis, timeEndInMillis);
        return Lists.newArrayList((Iterable)FluentIterable.from(this.searchCubingJobs(cubeNameSubstring, projectName, states, timeStartInMillis, timeEndInMillis, allOutputs, false)).transform((Function)new Function<CubingJob, JobInstance>(){

            public JobInstance apply(CubingJob cubingJob) {
                return JobInfoConverter.parseToJobInstance((AbstractExecutable)cubingJob, (Map)allOutputs);
            }
        }));
    }

    public List<CubingJob> searchCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis, final Map<String, Output> allOutputs, final boolean cubeNameExactMatch) {
        ArrayList results = Lists.newArrayList((Iterable)FluentIterable.from((Iterable)this.getExecutableManager().getAllAbstractExecutables(timeStartInMillis, timeEndInMillis, CubingJob.class)).filter((Predicate)new Predicate<AbstractExecutable>(){

            public boolean apply(AbstractExecutable executable) {
                if (executable instanceof CubingJob) {
                    if (StringUtils.isEmpty((CharSequence)cubeName)) {
                        return true;
                    }
                    String executableCubeName = CubingExecutableUtil.getCubeName((Map)executable.getParams());
                    if (executableCubeName == null) {
                        return true;
                    }
                    if (cubeNameExactMatch) {
                        return executableCubeName.equalsIgnoreCase(cubeName);
                    }
                    return executableCubeName.contains(cubeName);
                }
                return false;
            }
        }).transform((Function)new Function<AbstractExecutable, CubingJob>(){

            public CubingJob apply(AbstractExecutable executable) {
                return (CubingJob)executable;
            }
        }).filter(Predicates.and((Predicate)new Predicate<CubingJob>(){

            public boolean apply(CubingJob executable) {
                if (null == projectName || null == JobService.this.getProjectManager().getProject(projectName)) {
                    return true;
                }
                return projectName.equals(executable.getProjectName());
            }
        }, (Predicate)new Predicate<CubingJob>(){

            public boolean apply(CubingJob executable) {
                Output output = (Output)allOutputs.get(executable.getId());
                ExecutableState state = output.getState();
                boolean ret = statusList.contains(state);
                return ret;
            }
        })));
        return results;
    }

    public List<CubingJob> listAllCubingJobs(String cubeName, String projectName, Set<ExecutableState> statusList) {
        return this.searchCubingJobs(cubeName, projectName, statusList, 0L, Long.MAX_VALUE, this.getExecutableManager().getAllOutputs(), true);
    }

    public List<CubingJob> listAllCubingJobs(String cubeName, String projectName) {
        return this.searchCubingJobs(cubeName, projectName, EnumSet.allOf(ExecutableState.class), 0L, Long.MAX_VALUE, this.getExecutableManager().getAllOutputs(), true);
    }
}

