/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.app;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.streaming.app.StreamingApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class StreamingMergeApplication
extends StreamingApplication {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamingMergeApplication.class);
    private final Map<String, Pair<String, Long>> removeSegIds = new HashMap<String, Pair<String, Long>>();
    protected long thresholdOfSegSize;
    protected Integer numberOfSeg;

    protected StreamingMergeApplication() {
        this.jobType = JobTypeEnum.STREAMING_MERGE;
    }

    @Override
    public void parseParams(String[] args) {
        this.project = args[0];
        this.dataflowId = args[1];
        this.thresholdOfSegSize = StreamingUtils.parseSize((String)args[2]);
        this.numberOfSeg = Integer.parseInt(args[3]);
        this.distMetaUrl = args[4];
        this.jobId = StreamingUtils.getJobId((String)this.dataflowId, (String)this.jobType.name());
        Preconditions.checkArgument((boolean)StringUtils.isNotEmpty((CharSequence)this.distMetaUrl), (Object)"distMetaUrl should not be empty!");
    }

    public void putHdfsFile(String segId, Pair<String, Long> item) {
        this.removeSegIds.put(segId, item);
    }

    public void clearHdfsFiles(NDataflow dataflow, AtomicLong startTime) {
        long intervals;
        long hdfsFileScanStartTime = startTime.get();
        long now = System.currentTimeMillis();
        if (now - hdfsFileScanStartTime > (intervals = KylinConfig.getInstanceFromEnv().getStreamingSegmentCleanInterval() * 60L * 60L * 1000L)) {
            Iterator<String> iter = this.removeSegIds.keySet().iterator();
            while (iter.hasNext()) {
                String segId = iter.next();
                if (dataflow.getSegment(segId) != null) continue;
                if (now - (Long)this.removeSegIds.get(segId).getValue() > intervals * 10L) {
                    iter.remove();
                    continue;
                }
                if (now - (Long)this.removeSegIds.get(segId).getValue() <= intervals) continue;
                try {
                    HadoopUtil.deletePath((Configuration)HadoopUtil.getCurrentConfiguration(), (Path)new Path((String)this.removeSegIds.get(segId).getKey()));
                    iter.remove();
                }
                catch (IOException e) {
                    log.warn(e.getMessage());
                }
            }
            startTime.set(now);
        }
    }

    @Generated
    public long getThresholdOfSegSize() {
        return this.thresholdOfSegSize;
    }

    @Generated
    public void setThresholdOfSegSize(long thresholdOfSegSize) {
        this.thresholdOfSegSize = thresholdOfSegSize;
    }

    @Generated
    public Integer getNumberOfSeg() {
        return this.numberOfSeg;
    }

    @Generated
    public void setNumberOfSeg(Integer numberOfSeg) {
        this.numberOfSeg = numberOfSeg;
    }
}

