/*
 * Decompiled with CFR 0.152.
 */
package org.apache.edgent.window;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.edgent.function.BiConsumer;
import org.apache.edgent.function.BiFunction;
import org.apache.edgent.function.Consumer;
import org.apache.edgent.function.Supplier;
import org.apache.edgent.window.InsertionTimeList;
import org.apache.edgent.window.Partition;

public class Policies {
    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> scheduleEvictIfEmpty(long time, TimeUnit unit) {
        return (BiConsumer & Serializable)(partition, tuple) -> {
            if (partition.getContents().isEmpty()) {
                ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
                ses.schedule(() -> partition.evict(), time, unit);
            }
        };
    }

    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> scheduleEvictOnFirstInsert(final long time, final TimeUnit unit) {
        return new BiConsumer<Partition<T, K, L>, T>(){
            private Set<Partition<T, K, L>> initialized_partitions = Collections.synchronizedSet(new HashSet());

            public void accept(Partition<T, K, L> partition, T tuple) {
                if (!this.initialized_partitions.contains(partition)) {
                    this.initialized_partitions.add(partition);
                    ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
                    ses.schedule(() -> partition.evict(), time, unit);
                }
            }
        };
    }

    public static <T, K> Consumer<Partition<T, K, InsertionTimeList<T>>> evictOlderWithProcess(long time, TimeUnit unit) {
        long timeMs = TimeUnit.MILLISECONDS.convert(time, unit);
        return (Consumer & Serializable)partition -> {
            ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
            InsertionTimeList tuples = (InsertionTimeList)partition.getContents();
            long evictTime = System.currentTimeMillis() - timeMs;
            tuples.evictOlderThan(evictTime);
            partition.process();
            if (!tuples.isEmpty()) {
                ses.schedule(() -> partition.evict(), tuples.nextEvictDelay(timeMs), TimeUnit.MILLISECONDS);
            }
        };
    }

    public static <T, K> Consumer<Partition<T, K, List<T>>> evictAllAndScheduleEvictWithProcess(long time, TimeUnit unit) {
        long timeMs = TimeUnit.MILLISECONDS.convert(time, unit);
        return (Consumer & Serializable)partition -> {
            ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
            Object tuples = partition.getContents();
            partition.process();
            tuples.clear();
            ses.schedule(() -> partition.evict(), timeMs, TimeUnit.MILLISECONDS);
        };
    }

    public static <T, K, L extends List<T>> BiFunction<Partition<T, K, L>, T, Boolean> alwaysInsert() {
        return (BiFunction & Serializable)(partition, tuple) -> true;
    }

    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> countContentsPolicy(int count) {
        return (BiConsumer & Serializable)(partition, tuple) -> {
            if (partition.getContents().size() >= count) {
                partition.evict();
            }
        };
    }

    public static <T, K, L extends List<T>> Consumer<Partition<T, K, L>> evictAll() {
        return (Consumer & Serializable)partition -> partition.getContents().clear();
    }

    public static <T, K, L extends List<T>> Consumer<Partition<T, K, L>> evictOldest() {
        return (Consumer & Serializable)partition -> partition.getContents().remove(0);
    }

    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> processOnInsert() {
        return (BiConsumer & Serializable)(partition, tuple) -> partition.process();
    }

    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> processWhenFullAndEvict(int size) {
        return (BiConsumer & Serializable)(partition, tuple) -> {
            if (partition.getContents().size() >= size) {
                partition.process();
                partition.evict();
            }
        };
    }

    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> doNothing() {
        return (BiConsumer & Serializable)(partition, key) -> {};
    }

    public static <T> Supplier<InsertionTimeList<T>> insertionTimeList() {
        return (Supplier & Serializable)() -> new InsertionTimeList();
    }
}

