/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.duty;

import com.google.common.base.Predicate;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.druid.collections.CircularList;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.config.KillUnusedSegmentsConfig;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.CoordinatorDutyUtils;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.joda.time.ReadableInterval;
import org.joda.time.ReadablePeriod;
import org.joda.time.base.AbstractInterval;

public class KillUnusedSegments
implements CoordinatorDuty {
    public static final String KILL_TASK_TYPE = "kill";
    public static final String TASK_ID_PREFIX = "coordinator-issued";
    private static final Predicate<TaskStatusPlus> IS_AUTO_KILL_TASK = status -> null != status && KILL_TASK_TYPE.equals(status.getType()) && status.getId().startsWith(TASK_ID_PREFIX);
    private static final Logger log = new Logger(KillUnusedSegments.class);
    private final Duration period;
    private final Duration durationToRetain;
    private final boolean ignoreDurationToRetain;
    private final int maxSegmentsToKill;
    private final Period maxIntervalToKill;
    private final Duration bufferPeriod;
    private final Map<String, DateTime> datasourceToLastKillIntervalEnd;
    private DateTime lastKillTime;
    private final SegmentsMetadataManager segmentsMetadataManager;
    private final OverlordClient overlordClient;
    private String prevDatasourceKilled;
    private CircularList<String> datasourceCircularKillList;

    public KillUnusedSegments(SegmentsMetadataManager segmentsMetadataManager, OverlordClient overlordClient, KillUnusedSegmentsConfig killConfig) {
        this.period = killConfig.getCleanupPeriod();
        this.maxSegmentsToKill = killConfig.getMaxSegments();
        this.maxIntervalToKill = killConfig.getMaxInterval();
        this.ignoreDurationToRetain = killConfig.isIgnoreDurationToRetain();
        this.durationToRetain = killConfig.getDurationToRetain();
        if (this.ignoreDurationToRetain) {
            log.info("druid.coordinator.kill.durationToRetain[%s] will be ignored when discovering segments to kill because druid.coordinator.kill.ignoreDurationToRetain is set to true.", new Object[]{this.durationToRetain});
        }
        this.bufferPeriod = killConfig.getBufferPeriod();
        log.info("Kill task scheduling enabled with period[%s], durationToRetain[%s], bufferPeriod[%s], maxSegmentsToKill[%s], maxIntervalToKill[%s]", new Object[]{this.period, this.ignoreDurationToRetain ? "IGNORING" : this.durationToRetain, this.bufferPeriod, this.maxSegmentsToKill, this.maxIntervalToKill});
        this.segmentsMetadataManager = segmentsMetadataManager;
        this.overlordClient = overlordClient;
        this.datasourceToLastKillIntervalEnd = new ConcurrentHashMap<String, DateTime>();
    }

    @Override
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
        if (this.canDutyRun()) {
            return this.runInternal(params);
        }
        log.debug("Skipping KillUnusedSegments until period[%s] has elapsed after lastKillTime[%s].", new Object[]{this.period, this.lastKillTime});
        return params;
    }

    private DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams params) {
        CoordinatorRunStats stats;
        CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
        int availableKillTaskSlots = this.getAvailableKillTaskSlots(dynamicConfig, stats = params.getCoordinatorStats());
        if (availableKillTaskSlots <= 0) {
            log.debug("Skipping KillUnusedSegments because there are no available kill task slots.", new Object[0]);
            return params;
        }
        Set<String> dataSourcesToKill = CollectionUtils.isNullOrEmpty(dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn()) ? this.segmentsMetadataManager.retrieveAllDataSourceNames() : dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
        if (this.datasourceCircularKillList == null || !this.datasourceCircularKillList.equalsSet(dataSourcesToKill)) {
            this.datasourceCircularKillList = new CircularList(dataSourcesToKill, Comparator.naturalOrder());
        }
        this.lastKillTime = DateTimes.nowUtc();
        this.killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
        this.datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill);
        return params;
    }

    private void killUnusedSegments(Set<String> dataSourcesToKill, int availableKillTaskSlots, CoordinatorRunStats stats) {
        if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
            log.debug("Skipping KillUnusedSegments because there are no datasources to kill.", new Object[0]);
            stats.add(Stats.Kill.SUBMITTED_TASKS, 0L);
            return;
        }
        HashSet<String> remainingDatasourcesToKill = new HashSet<String>(dataSourcesToKill);
        int submittedTasks = 0;
        for (String dataSource : this.datasourceCircularKillList) {
            if (dataSource.equals(this.prevDatasourceKilled) && remainingDatasourcesToKill.size() > 1) continue;
            this.prevDatasourceKilled = dataSource;
            remainingDatasourcesToKill.remove(dataSource);
            DateTime maxUsedStatusLastUpdatedTime = DateTimes.nowUtc().minus((ReadableDuration)this.bufferPeriod);
            Interval intervalToKill = this.findIntervalForKill(dataSource, maxUsedStatusLastUpdatedTime, stats);
            if (intervalToKill == null) {
                this.datasourceToLastKillIntervalEnd.remove(dataSource);
                if (!remainingDatasourcesToKill.isEmpty()) continue;
                break;
            }
            try {
                FutureUtils.getUnchecked(this.overlordClient.runKillTask(TASK_ID_PREFIX, dataSource, intervalToKill, null, this.maxSegmentsToKill, maxUsedStatusLastUpdatedTime), (boolean)true);
                ++submittedTasks;
                this.datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
                if (remainingDatasourcesToKill.isEmpty()) break;
                if (submittedTasks < availableKillTaskSlots) continue;
            }
            catch (Exception ex) {
                log.error((Throwable)ex, "Failed to submit kill task for dataSource[%s] in interval[%s]", new Object[]{dataSource, intervalToKill});
                if (!Thread.currentThread().isInterrupted()) continue;
                log.warn("Skipping kill task scheduling because thread is interrupted.", new Object[0]);
            }
            break;
        }
        log.info("Submitted [%d] kill tasks for [%d] datasources: [%s]. Remaining [%d] datasources to kill: [%s].", new Object[]{submittedTasks, dataSourcesToKill.size() - remainingDatasourcesToKill.size(), Sets.difference(dataSourcesToKill, remainingDatasourcesToKill), remainingDatasourcesToKill.size(), remainingDatasourcesToKill});
        stats.add(Stats.Kill.SUBMITTED_TASKS, submittedTasks);
    }

    @Nullable
    private Interval findIntervalForKill(String dataSource, DateTime maxUsedStatusLastUpdatedTime, CoordinatorRunStats stats) {
        DateTime minStartTime = this.datasourceToLastKillIntervalEnd.get(dataSource);
        DateTime maxEndTime = this.ignoreDurationToRetain ? DateTimes.COMPARE_DATE_AS_STRING_MAX : (minStartTime == null ? DateTimes.nowUtc().minus((ReadableDuration)this.durationToRetain) : DateTimes.min((DateTime)DateTimes.nowUtc().minus((ReadableDuration)this.durationToRetain), (DateTime)minStartTime.plus((ReadablePeriod)this.maxIntervalToKill)));
        List<Interval> unusedSegmentIntervals = KillUnusedSegments.limitToPeriod(this.segmentsMetadataManager.getUnusedSegmentIntervals(dataSource, minStartTime, maxEndTime, this.maxSegmentsToKill, maxUsedStatusLastUpdatedTime), this.maxIntervalToKill);
        RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource);
        stats.add(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, datasourceKey, unusedSegmentIntervals.size());
        if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) {
            return null;
        }
        if (unusedSegmentIntervals.size() == 1) {
            return unusedSegmentIntervals.get(0);
        }
        return JodaUtils.umbrellaInterval(unusedSegmentIntervals);
    }

    private boolean canDutyRun() {
        return this.lastKillTime == null || !DateTimes.nowUtc().isBefore((ReadableInstant)this.lastKillTime.plus((ReadableDuration)this.period));
    }

    private int getAvailableKillTaskSlots(CoordinatorDynamicConfig config, CoordinatorRunStats stats) {
        int killTaskCapacity = Math.min((int)((double)CoordinatorDutyUtils.getTotalWorkerCapacity(this.overlordClient) * Math.min(config.getKillTaskSlotRatio(), 1.0)), config.getMaxKillTaskSlots());
        int availableKillTaskSlots = Math.max(0, killTaskCapacity - CoordinatorDutyUtils.getStatusOfActiveTasks(this.overlordClient, IS_AUTO_KILL_TASK).size());
        stats.add(Stats.Kill.AVAILABLE_SLOTS, availableKillTaskSlots);
        stats.add(Stats.Kill.MAX_SLOTS, killTaskCapacity);
        return availableKillTaskSlots;
    }

    static List<Interval> limitToPeriod(List<Interval> intervals, Period maxPeriod) {
        if (DateTimes.EPOCH.plus((ReadablePeriod)maxPeriod).equals((Object)DateTimes.EPOCH) || intervals.size() <= 1) {
            return intervals;
        }
        DateTime earliestStart = Collections.min(intervals, Comparator.comparing(AbstractInterval::getStart)).getStart();
        Interval retainInterval = new Interval((ReadableInstant)earliestStart, (ReadablePeriod)maxPeriod);
        ArrayList<Interval> retVal = new ArrayList<Interval>();
        for (Interval interval : intervals) {
            if (!interval.getStart().equals((Object)earliestStart) && !retainInterval.contains((ReadableInterval)interval)) continue;
            retVal.add(interval);
        }
        return retVal;
    }
}

