/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.fs;

import io.streamthoughts.kafka.connect.filepulse.clean.BatchFileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.clean.DelegateBatchFileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicyResult;
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicyResultSet;
import io.streamthoughts.kafka.connect.filepulse.clean.GenericFileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.fs.FileObjectCandidatesFilter;
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemListing;
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemMonitor;
import io.streamthoughts.kafka.connect.filepulse.fs.TaskFileOrder;
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectKey;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectStatus;
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffsetPolicy;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
import java.time.Duration;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultFileSystemMonitor
implements FileSystemMonitor {
    private static final long TASK_CONFIGURATION_DEFAULT_TIMEOUT = 15000L;
    private static final Logger LOG = LoggerFactory.getLogger(DefaultFileSystemMonitor.class);
    private static final Duration ON_START_READ_END_LOG_TIMEOUT = Duration.ofSeconds(30L);
    private static final Duration DEFAULT_READ_END_LOG_TIMEOUT = Duration.ofSeconds(5L);
    private static final int MAX_SCHEDULE_ATTEMPTS = 3;
    private final FileSystemListing<?> fsListing;
    private final StateBackingStore<FileObject> store;
    private final Map<FileObjectKey, FileObjectMeta> scheduled = new ConcurrentHashMap<FileObjectKey, FileObjectMeta>();
    private final Map<FileObjectKey, FileObjectMeta> scanned = new ConcurrentHashMap<FileObjectKey, FileObjectMeta>();
    private final LinkedBlockingQueue<FileObject> cleanable = new LinkedBlockingQueue();
    private StateSnapshot<FileObject> fileState;
    private final SourceOffsetPolicy offsetPolicy;
    private final BatchFileCleanupPolicy cleaner;
    private final Long allowTasksReconfigurationAfterTimeoutMs;
    private Long nextAllowedTasksReconfiguration = -1L;
    private final AtomicBoolean taskReconfigurationRequested = new AtomicBoolean(false);
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final AtomicBoolean changed = new AtomicBoolean(false);
    private final AtomicBoolean fileSystemListingEnabled = new AtomicBoolean(true);
    private final Predicate<FileObjectStatus> cleanablePredicate;
    private final TaskFileOrder taskFileOrder;

    public DefaultFileSystemMonitor(Long allowTasksReconfigurationAfterTimeoutMs, FileSystemListing<?> fsListening, GenericFileCleanupPolicy cleanPolicy, final Predicate<FileObjectStatus> cleanablePredicate, SourceOffsetPolicy offsetPolicy, StateBackingStore<FileObject> store, TaskFileOrder taskFileOrder) {
        Objects.requireNonNull(fsListening, "'fsListening' should not be null");
        Objects.requireNonNull(cleanPolicy, "'cleanPolicy' should not be null");
        Objects.requireNonNull(offsetPolicy, "'offsetPolicy' should not be null");
        Objects.requireNonNull(store, "'store' should not null");
        Objects.requireNonNull(cleanablePredicate, "'cleanablePredicate' should not null");
        Objects.requireNonNull(taskFileOrder, "'taskFileOrder' should not null");
        this.fsListing = fsListening;
        this.allowTasksReconfigurationAfterTimeoutMs = allowTasksReconfigurationAfterTimeoutMs;
        this.cleanablePredicate = cleanablePredicate;
        this.taskFileOrder = taskFileOrder;
        if (cleanPolicy instanceof FileCleanupPolicy) {
            this.cleaner = new DelegateBatchFileCleanupPolicy((FileCleanupPolicy)cleanPolicy);
        } else if (cleanPolicy instanceof BatchFileCleanupPolicy) {
            this.cleaner = (BatchFileCleanupPolicy)cleanPolicy;
        } else {
            throw new IllegalArgumentException("Cleaner must be one of 'FileCleanupPolicy', 'BatchFileCleanupPolicy' not " + cleanPolicy.getClass().getName());
        }
        this.cleaner.setStorage(fsListening.storage());
        this.offsetPolicy = offsetPolicy;
        this.store = store;
        LOG.info("Initializing FileSystemMonitor");
        this.store.setUpdateListener((StateBackingStore.UpdateListener)new StateBackingStore.UpdateListener<FileObject>(){

            public void onStateRemove(String key) {
            }

            public void onStateUpdate(String key, FileObject object) {
                FileObjectMeta removed;
                FileObjectKey objectId = FileObjectKey.of((String)key);
                FileObjectStatus status = object.status();
                LOG.debug("Received status '{} 'for: {}", (Object)status, (Object)object);
                if (cleanablePredicate.test(status)) {
                    DefaultFileSystemMonitor.this.cleanable.add(object.withKey(objectId));
                    if (DefaultFileSystemMonitor.this.scanned.remove(objectId) != null) {
                        DefaultFileSystemMonitor.this.changed.set(true);
                    }
                } else if (status.isOneOf(new FileObjectStatus[]{FileObjectStatus.CLEANED, FileObjectStatus.INVALID}) && (removed = DefaultFileSystemMonitor.this.scheduled.remove(objectId)) == null && status.isOneOf(new FileObjectStatus[]{FileObjectStatus.CLEANED})) {
                    LOG.debug("Received cleaned status but no object-file currently scheduled for: '{}'. This warn should only occurred during recovering step", (Object)key);
                }
            }
        });
        if (!this.store.isStarted()) {
            this.store.start();
        } else {
            LOG.warn("The StateBackingStore used to synchronize this connector with tasks processing files is already started. You can ignore that warning if the connector  is recovering from a crash or resuming after being paused.");
        }
        this.readStatesToEnd(ON_START_READ_END_LOG_TIMEOUT);
        this.recoverPreviouslyCompletedSources();
        this.cleanUpCompletedFiles();
        LOG.info("Initialized FileSystemMonitor");
    }

    private void recoverPreviouslyCompletedSources() {
        LOG.info("Recovering completed files from a previous execution");
        this.fileState.states().entrySet().stream().map(it -> ((FileObject)it.getValue()).withKey(FileObjectKey.of((String)((String)it.getKey())))).filter(it -> this.cleanablePredicate.test(it.status())).forEach(this.cleanable::add);
        LOG.info("Finished recovering previously completed files : " + this.cleanable);
    }

    private boolean readStatesToEnd(Duration timeout) {
        try {
            this.store.refresh(timeout.toMillis(), TimeUnit.MILLISECONDS);
            this.fileState = this.store.snapshot();
            LOG.debug("Finished reading to end of log and updated states snapshot, new states log position: {}", (Object)this.fileState.offset());
            return true;
        }
        catch (TimeoutException e) {
            LOG.warn("Failed to reach end of states log quickly enough", (Throwable)e);
            return false;
        }
    }

    public void invoke(ConnectorContext context) {
        this.cleanUpCompletedFiles();
        if (this.running.get() && this.fileSystemListingEnabled.get()) {
            if (!this.taskReconfigurationRequested.get()) {
                if (this.updateFiles()) {
                    LOG.info("Requesting task reconfiguration");
                    this.taskReconfigurationRequested.set(true);
                    context.requestTaskReconfiguration();
                }
            } else {
                LOG.info("Task reconfiguration requested. Skip filesystem listing.");
            }
        } else if (this.fileSystemListingEnabled.get()) {
            LOG.info("The connector is not completely started or is being shut down. Skip filesystem listing.");
        }
    }

    public void setFileSystemListingEnabled(boolean enabled) {
        this.fileSystemListingEnabled.set(enabled);
    }

    private void cleanUpCompletedFiles() {
        if (this.cleanable.isEmpty()) {
            LOG.debug("Skipped cleanup. No object file completed.");
            return;
        }
        LOG.info("Cleaning up completed object files '{}'", (Object)this.cleanable.size());
        ArrayList cleanable = new ArrayList(this.cleanable.size());
        this.cleanable.drainTo(cleanable);
        FileCleanupPolicyResultSet cleaned = (FileCleanupPolicyResultSet)this.cleaner.apply(cleanable);
        cleaned.forEach((fileObject, result) -> {
            if (result.equals((Object)FileCleanupPolicyResult.SUCCEED)) {
                String key = ((FileObjectKey)fileObject.key().get()).original();
                this.store.putAsync(key, (Object)fileObject.withStatus(FileObjectStatus.CLEANED));
            } else {
                LOG.warn("Postpone clean up for object file: '{}'", (Object)fileObject.metadata().stringURI());
                this.cleanable.add((FileObject)fileObject);
            }
        });
        LOG.info("Finished cleaning all completed object files");
    }

    private synchronized boolean updateFiles() {
        boolean noScheduledFiles = this.scheduled.isEmpty();
        if (!noScheduledFiles && this.allowTasksReconfigurationAfterTimeoutMs == Long.MAX_VALUE) {
            LOG.info("Scheduled files still being processed: {}. Skip filesystem listing while waiting for tasks completion", (Object)this.scheduled.size());
            return false;
        }
        boolean toEnd = this.readStatesToEnd(DEFAULT_READ_END_LOG_TIMEOUT);
        if (noScheduledFiles && !toEnd) {
            LOG.warn("Failed to read state changelog. Skip filesystem listing due to timeout");
            return false;
        }
        LOG.info("Starting to list object files using: {}", (Object)this.fsListing.getClass().getSimpleName());
        long started = Time.SYSTEM.milliseconds();
        Collection objects = this.fsListing.listObjects();
        long took = Time.SYSTEM.milliseconds() - started;
        LOG.info("Completed object files listing. '{}' object files found in {}ms", (Object)objects.size(), (Object)took);
        StateSnapshot snapshot = this.store.snapshot();
        Map<FileObjectKey, FileObjectMeta> toScheduled = FileObjectCandidatesFilter.filter(this.offsetPolicy, fileObjectKey -> {
            FileObject fileObject = (FileObject)snapshot.getForKey(fileObjectKey.original());
            if (fileObject == null) {
                return true;
            }
            FileObjectStatus status = fileObject.status();
            return !this.cleanablePredicate.test(status) && !status.isDone();
        }, objects);
        if (!noScheduledFiles) {
            long timeout;
            if (this.scheduled.keySet().containsAll(toScheduled.keySet())) {
                LOG.info("Scheduled files still being processed ({}) and no new files found. Skip task reconfiguration", (Object)this.scheduled.size());
                return false;
            }
            if (this.nextAllowedTasksReconfiguration == -1L) {
                this.nextAllowedTasksReconfiguration = Time.SYSTEM.milliseconds() + this.allowTasksReconfigurationAfterTimeoutMs;
            }
            if ((timeout = Math.max(0L, this.nextAllowedTasksReconfiguration - Time.SYSTEM.milliseconds())) > 0L) {
                LOG.info("Scheduled files still being processed ({}) but new files detected. Waiting for {} ms before allowing task reconfiguration", (Object)this.scheduled.size(), (Object)timeout);
                return false;
            }
        }
        this.nextAllowedTasksReconfiguration = -1L;
        this.scanned.putAll(toScheduled);
        this.notifyAll();
        LOG.info("Finished lookup for new object files: '{}' files can be scheduled for processing", (Object)this.scanned.size());
        return !this.scanned.isEmpty() && this.running.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<FileObjectMeta> listFilesToSchedule(int maxFilesToSchedule) {
        if (!this.running.get()) {
            LOG.info("Started FileSystemMonitor");
            this.running.set(true);
            return Collections.emptyList();
        }
        try {
            long started;
            long now = started = Time.SYSTEM.milliseconds();
            while (this.scanned.isEmpty() && now - started < 15000L) {
                try {
                    DefaultFileSystemMonitor defaultFileSystemMonitor = this;
                    synchronized (defaultFileSystemMonitor) {
                        LOG.info("No file to be scheduled, waiting for next filesystem scan execution");
                        this.wait(Math.max(0L, 15000L - (now - started)));
                    }
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                now = Time.SYSTEM.milliseconds();
            }
            AbstractList partitions = new LinkedList<FileObjectMeta>();
            if (!this.scanned.isEmpty()) {
                int attempts = 0;
                do {
                    this.changed.set(false);
                    LOG.info("Preparing next scheduling using the object files found during last iteration (attempt={}/{}).", (Object)(attempts + 1), (Object)3);
                    boolean toEnd = this.readStatesToEnd(DEFAULT_READ_END_LOG_TIMEOUT);
                    if (!toEnd) {
                        LOG.warn("Failed to read state changelog while scheduling object files. Timeout.");
                    }
                    if (this.scanned.size() <= maxFilesToSchedule) {
                        this.scheduled.putAll(this.scanned);
                    } else {
                        Iterator<Map.Entry<FileObjectKey, FileObjectMeta>> it = this.scanned.entrySet().iterator();
                        while (this.scheduled.size() < maxFilesToSchedule && it.hasNext()) {
                            Map.Entry<FileObjectKey, FileObjectMeta> next = it.next();
                            this.scheduled.put(next.getKey(), next.getValue());
                        }
                    }
                    partitions = new ArrayList<FileObjectMeta>(this.scheduled.values());
                    ++attempts;
                    if (!this.changed.get()) continue;
                    if (attempts == 3) {
                        LOG.warn("Failed to prepare the object files after attempts: {}.", (Object)3);
                        this.scheduled.clear();
                        List<FileObjectMeta> list = Collections.emptyList();
                        return list;
                    }
                    LOG.warn("State updates was received while preparing the object files to be scheduled");
                } while (this.changed.get() && attempts < 3);
            }
            if (partitions.isEmpty()) {
                LOG.warn("Filesystem could not be scanned quickly enough, or no object file was detected after starting the connector.");
            }
            List<FileObjectMeta> list = this.taskFileOrder.sort(partitions);
            return list;
        }
        finally {
            this.scanned.clear();
            this.taskReconfigurationRequested.set(false);
        }
    }

    public void close() {
        if (this.running.compareAndSet(true, false)) {
            try {
                LOG.info("Closing FileSystemMonitor resources");
                this.readStatesToEnd(DEFAULT_READ_END_LOG_TIMEOUT);
                this.cleanUpCompletedFiles();
                LOG.info("Closed FileSystemMonitor resources");
            }
            catch (Exception e) {
                LOG.warn("Unexpected error while closing FileSystemMonitor.", (Throwable)e);
            }
        }
    }
}

