/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
@Internal
public class ContinuousFileMonitoringFunction<OUT>
extends RichSourceFunction<TimestampedFileInputSplit>
implements CheckpointedFunction {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class);
    public static final long MIN_MONITORING_INTERVAL = 1L;
    private final String path;
    private final int readerParallelism;
    private final FileInputFormat<OUT> format;
    private final long interval;
    private final FileProcessingMode watchType;
    private volatile long globalModificationTime;
    private transient Object checkpointLock;
    private volatile boolean isRunning = true;
    private transient ListState<Long> checkpointedState;

    public ContinuousFileMonitoringFunction(FileInputFormat<OUT> format, FileProcessingMode watchType, int readerParallelism, long interval) {
        Preconditions.checkArgument((watchType == FileProcessingMode.PROCESS_ONCE || interval >= 1L ? 1 : 0) != 0, (Object)("The specified monitoring interval (" + interval + " ms) is smaller than the minimum allowed one (" + 1L + " ms)."));
        Preconditions.checkArgument((format.getFilePaths().length == 1 ? 1 : 0) != 0, (Object)"FileInputFormats with multiple paths are not supported yet.");
        this.format = (FileInputFormat)Preconditions.checkNotNull(format, (String)"Unspecified File Input Format.");
        this.path = (String)Preconditions.checkNotNull((Object)format.getFilePaths()[0].toString(), (String)"Unspecified Path.");
        this.interval = interval;
        this.watchType = watchType;
        this.readerParallelism = Math.max(readerParallelism, 1);
        this.globalModificationTime = Long.MIN_VALUE;
    }

    @VisibleForTesting
    public long getGlobalModificationTime() {
        return this.globalModificationTime;
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        Preconditions.checkState((this.checkpointedState == null ? 1 : 0) != 0, (Object)("The " + this.getClass().getSimpleName() + " has already been initialized."));
        this.checkpointedState = context.getOperatorStateStore().getListState(new ListStateDescriptor("file-monitoring-state", (TypeSerializer)LongSerializer.INSTANCE));
        if (context.isRestored()) {
            LOG.info("Restoring state for the {}.", (Object)this.getClass().getSimpleName());
            ArrayList<Long> retrievedStates = new ArrayList<Long>();
            for (Long entry : (Iterable)this.checkpointedState.get()) {
                retrievedStates.add(entry);
            }
            Preconditions.checkArgument((retrievedStates.size() <= 1 ? 1 : 0) != 0, (Object)(this.getClass().getSimpleName() + " retrieved invalid state."));
            if (retrievedStates.size() == 1 && this.globalModificationTime != Long.MIN_VALUE) {
                throw new IllegalArgumentException("The " + this.getClass().getSimpleName() + " has already restored from a previous Flink version.");
            }
            if (retrievedStates.size() == 1) {
                this.globalModificationTime = (Long)retrievedStates.get(0);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} retrieved a global mod time of {}.", (Object)this.getClass().getSimpleName(), (Object)this.globalModificationTime);
                }
            }
        } else {
            LOG.info("No state to restore for the {}.", (Object)this.getClass().getSimpleName());
        }
    }

    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);
        this.format.configure(new Configuration());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Opened {} (taskIdx= {}) for path: {}", new Object[]{this.getClass().getSimpleName(), this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.path});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run(SourceFunction.SourceContext<TimestampedFileInputSplit> context) throws Exception {
        Path p = new Path(this.path);
        FileSystem fileSystem = FileSystem.get((URI)p.toUri());
        if (!fileSystem.exists(p)) {
            throw new FileNotFoundException("The provided file path " + this.path + " does not exist.");
        }
        this.checkpointLock = context.getCheckpointLock();
        switch (this.watchType) {
            case PROCESS_CONTINUOUSLY: {
                while (this.isRunning) {
                    Object object = this.checkpointLock;
                    synchronized (object) {
                        this.monitorDirAndForwardSplits(fileSystem, context);
                    }
                    Thread.sleep(this.interval);
                }
                break;
            }
            case PROCESS_ONCE: {
                Object object = this.checkpointLock;
                synchronized (object) {
                    if (this.globalModificationTime == Long.MIN_VALUE) {
                        this.monitorDirAndForwardSplits(fileSystem, context);
                        this.globalModificationTime = Long.MAX_VALUE;
                    }
                    this.isRunning = false;
                    break;
                }
            }
            default: {
                this.isRunning = false;
                throw new RuntimeException("Unknown WatchType" + (Object)((Object)this.watchType));
            }
        }
    }

    private void monitorDirAndForwardSplits(FileSystem fs, SourceFunction.SourceContext<TimestampedFileInputSplit> context) throws IOException {
        assert (Thread.holdsLock(this.checkpointLock));
        Map<Path, FileStatus> eligibleFiles = this.listEligibleFiles(fs, new Path(this.path));
        Map<Long, List<TimestampedFileInputSplit>> splitsSortedByModTime = this.getInputSplitsSortedByModTime(eligibleFiles);
        for (Map.Entry<Long, List<TimestampedFileInputSplit>> splits : splitsSortedByModTime.entrySet()) {
            long modificationTime = splits.getKey();
            for (TimestampedFileInputSplit split : splits.getValue()) {
                LOG.info("Forwarding split: " + split);
                context.collect(split);
            }
            this.globalModificationTime = Math.max(this.globalModificationTime, modificationTime);
        }
    }

    private Map<Long, List<TimestampedFileInputSplit>> getInputSplitsSortedByModTime(Map<Path, FileStatus> eligibleFiles) throws IOException {
        TreeMap<Long, List<TimestampedFileInputSplit>> splitsByModTime = new TreeMap<Long, List<TimestampedFileInputSplit>>();
        if (eligibleFiles.isEmpty()) {
            return splitsByModTime;
        }
        for (FileInputSplit split : this.format.createInputSplits(this.readerParallelism)) {
            FileStatus fileStatus = eligibleFiles.get(split.getPath());
            if (fileStatus == null) continue;
            Long modTime = fileStatus.getModificationTime();
            ArrayList<TimestampedFileInputSplit> splitsToForward = (ArrayList<TimestampedFileInputSplit>)splitsByModTime.get(modTime);
            if (splitsToForward == null) {
                splitsToForward = new ArrayList<TimestampedFileInputSplit>();
                splitsByModTime.put(modTime, splitsToForward);
            }
            splitsToForward.add(new TimestampedFileInputSplit(modTime, split.getSplitNumber(), split.getPath(), split.getStart(), split.getLength(), split.getHostnames()));
        }
        return splitsByModTime;
    }

    private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem, Path path) {
        FileStatus[] statuses;
        try {
            statuses = fileSystem.listStatus(path);
        }
        catch (IOException e) {
            return Collections.emptyMap();
        }
        if (statuses == null) {
            LOG.warn("Path does not exist: {}", (Object)path);
            return Collections.emptyMap();
        }
        HashMap<Path, FileStatus> files = new HashMap<Path, FileStatus>();
        for (FileStatus status : statuses) {
            if (!status.isDir()) {
                long modificationTime;
                Path filePath = status.getPath();
                if (this.shouldIgnore(filePath, modificationTime = status.getModificationTime())) continue;
                files.put(filePath, status);
                continue;
            }
            if (!this.format.getNestedFileEnumeration() || !this.format.acceptFile(status)) continue;
            files.putAll(this.listEligibleFiles(fileSystem, status.getPath()));
        }
        return files;
    }

    private boolean shouldIgnore(Path filePath, long modificationTime) {
        boolean shouldIgnore;
        assert (Thread.holdsLock(this.checkpointLock));
        boolean bl = shouldIgnore = modificationTime <= this.globalModificationTime;
        if (shouldIgnore && LOG.isDebugEnabled()) {
            LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime + " and global mod time= " + this.globalModificationTime);
        }
        return shouldIgnore;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws Exception {
        super.close();
        if (this.checkpointLock != null) {
            Object object = this.checkpointLock;
            synchronized (object) {
                this.globalModificationTime = Long.MAX_VALUE;
                this.isRunning = false;
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Closed File Monitoring Source for path: " + this.path + ".");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancel() {
        if (this.checkpointLock != null) {
            Object object = this.checkpointLock;
            synchronized (object) {
                this.globalModificationTime = Long.MAX_VALUE;
                this.isRunning = false;
            }
        } else {
            this.globalModificationTime = Long.MAX_VALUE;
            this.isRunning = false;
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        Preconditions.checkState((this.checkpointedState != null ? 1 : 0) != 0, (Object)("The " + this.getClass().getSimpleName() + " state has not been properly initialized."));
        this.checkpointedState.update(Collections.singletonList(this.globalModificationTime));
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} checkpointed {}.", (Object)this.getClass().getSimpleName(), (Object)this.globalModificationTime);
        }
    }
}

