/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.cache.fs;

import alluxio.client.file.CacheContext;
import alluxio.client.file.FileInStream;
import alluxio.client.file.URIStatus;
import alluxio.client.file.cache.CacheManager;
import alluxio.client.file.cache.LocalCacheFileInStream;
import alluxio.conf.AlluxioConfiguration;
import alluxio.hadoop.AlluxioHdfsInputStream;
import alluxio.hadoop.HadoopFileOpener;
import alluxio.hadoop.HadoopUtils;
import alluxio.metrics.MetricsConfig;
import alluxio.metrics.MetricsSystem;
import alluxio.wire.FileInfo;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.kylin.cache.fs.AlluxioHdfsFileInputStream;
import org.apache.kylin.cache.fs.CacheFileInputStream;
import org.apache.kylin.cache.fs.ManagerOfCacheFileContent;
import org.apache.kylin.cache.fs.ManagerOfCacheFileStatus;
import org.apache.kylin.cache.utils.Hadoop3CompaUtil;
import org.apache.kylin.cache.utils.ReflectionUtil;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.fileseg.FileSegments;
import org.apache.kylin.fileseg.FileSegmentsDetector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sparkproject.guava.hash.Hashing;

public abstract class AbstractCacheFileSystem
extends FilterFileSystem {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractCacheFileSystem.class);
    protected URI uri;
    protected String originalScheme;
    protected int bufferSize = 4096;
    protected boolean useLocalCache = false;
    protected boolean useFileStatusCache = false;
    protected boolean useLegacyFileInputStream = false;
    protected boolean useBufferFileInputStream = false;
    protected HadoopFileOpener mHadoopFileOpener;
    protected LocalCacheFileInStream.FileInStreamOpener mAlluxioFileOpener;
    protected AlluxioConfiguration mAlluxioConf;
    protected ManagerOfCacheFileContent mCacheManager;
    protected ManagerOfCacheFileStatus mStatusCache;
    protected Map<Path, Long> lastFetchTimeOfPaths = new ConcurrentHashMap<Path, Long>();
    protected static final Map<String, String> schemeClassMap = Stream.of(new AbstractMap.SimpleImmutableEntry<String, String>("file", "org.apache.hadoop.fs.LocalFileSystem"), new AbstractMap.SimpleImmutableEntry<String, String>("viewfs", "org.apache.hadoop.fs.viewfs.ViewFileSystem"), new AbstractMap.SimpleImmutableEntry<String, String>("s3a", "org.apache.hadoop.fs.s3a.S3AFileSystem"), new AbstractMap.SimpleImmutableEntry<String, String>("s3", "org.apache.hadoop.fs.s3a.S3AFileSystem"), new AbstractMap.SimpleImmutableEntry<String, String>("s3n", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"), new AbstractMap.SimpleImmutableEntry<String, String>("hdfs", "org.apache.hadoop.hdfs.DistributedFileSystem"), new AbstractMap.SimpleImmutableEntry<String, String>("wasb", "org.apache.hadoop.fs.azure.NativeAzureFileSystem"), new AbstractMap.SimpleImmutableEntry<String, String>("wasbs", "org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure"), new AbstractMap.SimpleImmutableEntry<String, String>("alluxio", "alluxio.hadoop.FileSystem")).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

    protected static FileSystem createInternalFS(URI uri, Configuration conf) throws IOException {
        FileSystem fs;
        if (!schemeClassMap.containsKey(uri.getScheme())) {
            throw new IOException("No FileSystem for scheme: " + uri.getScheme());
        }
        try {
            Class clazz = conf.getClassByName(schemeClassMap.get(uri.getScheme()));
            fs = (FileSystem)ReflectionUtils.newInstance((Class)clazz, (Configuration)conf);
            fs.initialize(uri, conf);
            log.info("Create filesystem {} for scheme {}", (Object)schemeClassMap.get(uri.getScheme()), (Object)uri.getScheme());
        }
        catch (ClassNotFoundException e) {
            throw new IOException("Can not found FileSystem Clazz for scheme: " + uri.getScheme());
        }
        return fs;
    }

    protected void createLocalCacheManager(Configuration conf) throws IOException {
        this.mHadoopFileOpener = uriStatus -> this.fs.open(new Path(uriStatus.getPath()));
        this.mAlluxioFileOpener = status -> new AlluxioHdfsInputStream(this.mHadoopFileOpener.open(status));
        this.mAlluxioConf = HadoopUtils.toAlluxioConf((Configuration)conf);
        Properties metricsProperties = new Properties();
        for (Map.Entry entry : conf) {
            metricsProperties.setProperty((String)entry.getKey(), (String)entry.getValue());
        }
        MetricsSystem.startSinksFromConfig((MetricsConfig)new MetricsConfig(metricsProperties));
        this.mCacheManager = new ManagerOfCacheFileContent(CacheManager.Factory.get((AlluxioConfiguration)this.mAlluxioConf));
    }

    public synchronized void initialize(URI name, Configuration conf) throws IOException {
        this.originalScheme = name.getScheme();
        this.fs = AbstractCacheFileSystem.createInternalFS(name, conf);
        this.statistics = (FileSystem.Statistics)ReflectionUtil.getFieldValue(this.fs, "statistics");
        if (null == this.statistics) {
            log.info("======= original statistics is null.");
        } else {
            log.info("======= original statistics is {} {}.", (Object)this.statistics.getScheme(), (Object)this.statistics);
        }
        super.initialize(name, conf);
        this.setConf(conf);
        log.info("======= current statistics is {} {}.", (Object)this.statistics.getScheme(), (Object)this.statistics);
        this.bufferSize = conf.getInt("io.file.buffer.size", 65536);
        this.useLocalCache = conf.getBoolean("spark.kylin.local-cache.enabled", false);
        this.useFileStatusCache = conf.getBoolean("spark.kylin.file-status-cache.enabled", false);
        this.useLegacyFileInputStream = conf.getBoolean("spark.kylin.local-cache.use.legacy.file-input-stream", false);
        this.useBufferFileInputStream = conf.getBoolean("spark.kylin.local-cache.use.buffer.file-input-stream", false);
        if (this.useFileStatusCache) {
            long fileStatusTTL = conf.getLong("spark.kylin.local-cache.filestatus-cache-ttl", 36000L);
            long fileStatusMaxSize = conf.getLong("spark.kylin.local-cache.filestatus-cache-max-size", 10000L);
            this.mStatusCache = new ManagerOfCacheFileStatus(fileStatusTTL, fileStatusMaxSize, p -> {
                try {
                    return this.fs.getFileStatus(p);
                }
                catch (IOException e) {
                    throw new IllegalStateException(e);
                }
            }, p -> {
                try {
                    List<LocatedFileStatus> ret = Hadoop3CompaUtil.RemoteIterators.toList(this.fs.listLocatedStatus(p));
                    return ret.toArray(new LocatedFileStatus[0]);
                }
                catch (IOException e) {
                    throw new IllegalStateException(e);
                }
            });
        }
        if (this.useLocalCache) {
            this.createLocalCacheManager(conf);
            log.info("Create LocalCacheFileSystem successfully .");
        }
    }

    public String getScheme() {
        return this.originalScheme;
    }

    protected abstract boolean isUseLocalCacheForCurrentExecutor();

    protected abstract long getAcceptCacheTime();

    private FileInfo wrapFileInfo(FileStatus fileStatus) {
        return new FileInfo().setLength(fileStatus.getLen()).setPath(fileStatus.getPath().toString()).setFolder(fileStatus.isDirectory()).setBlockSizeBytes(fileStatus.getBlockSize()).setLastModificationTimeMs(fileStatus.getModificationTime()).setLastAccessTimeMs(fileStatus.getAccessTime()).setOwner(fileStatus.getOwner()).setGroup(fileStatus.getGroup());
    }

    private int checkBufferSize(int size) {
        if (size < this.bufferSize) {
            size = this.bufferSize;
        }
        return size;
    }

    public FSDataInputStream open(Path f) throws IOException {
        return this.open(f, this.bufferSize);
    }

    public FSDataInputStream open(Path p, int bufferSize) throws IOException {
        return this.open(p, bufferSize, this.isUseLocalCacheForCurrentExecutor());
    }

    public FSDataInputStream open(Path p, int bufferSize, boolean useLocalCacheForExec) throws IOException {
        boolean enabled;
        boolean bl = enabled = this.useLocalCache && this.mCacheManager != null && useLocalCacheForExec;
        if (!enabled) {
            log.debug("Use original FileSystem to open file {} .", (Object)p);
            return this.fs.open(p, bufferSize);
        }
        Path f = this.fs.makeQualified(p);
        FileStatus fileStatus = this.getFileStatus(f);
        FileInfo fileInfo = this.wrapFileInfo(fileStatus);
        String fileHashFrom = String.valueOf(fileStatus.getPath()) + fileStatus.getLen() + fileStatus.getModificationTime();
        String fileId = Hashing.sha256().hashString((CharSequence)fileHashFrom, StandardCharsets.UTF_8).toString();
        CacheContext context = CacheContext.defaults().setCacheIdentifier(fileId);
        URIStatus status = new URIStatus(fileInfo, context);
        int cachedCount = this.mCacheManager.countCachedPages(fileId);
        if (this.useLegacyFileInputStream) {
            log.debug("Local cache (Legacy) opens file ({} pages cached) {} .", (Object)cachedCount, (Object)f);
            return new FSDataInputStream((InputStream)new AlluxioHdfsFileInputStream((FileInStream)new LocalCacheFileInStream(status, this.mAlluxioFileOpener, (CacheManager)this.mCacheManager, this.mAlluxioConf), this.statistics));
        }
        if (this.useBufferFileInputStream) {
            log.debug("Local cache (Direct-Buffer) opens file ({} pages cached) {} .", (Object)cachedCount, (Object)f);
            return new FSDataInputStream((InputStream)((Object)new CacheFileInputStream(f, (FileInStream)new LocalCacheFileInStream(status, this.mAlluxioFileOpener, (CacheManager)this.mCacheManager, this.mAlluxioConf), null, this.statistics, this.checkBufferSize(bufferSize))));
        }
        throw new IllegalStateException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FileStatus getFileStatus(Path p) throws IOException {
        if (!this.useFileStatusCache) {
            FileStatus fileStatus;
            StopWatch w = StopWatch.createStarted();
            try {
                fileStatus = this.fs.getFileStatus(p);
            }
            catch (Throwable throwable) {
                log.warn("Slow fs operation took {} ms: {}({})", new Object[]{w.getTime(), "getFileStatus", p});
                throw throwable;
            }
            log.warn("Slow fs operation took {} ms: {}({})", new Object[]{w.getTime(), "getFileStatus", p});
            return fileStatus;
        }
        p = this.fs.makeQualified(p);
        this.statistics.incrementReadOps(1);
        this.mStatusCache.ensureCacheCreateTime(p, this.getAcceptCacheTime());
        return this.mStatusCache.getFileStatus(p);
    }

    public FileStatus[] listStatus(Path p) throws IOException {
        return this._listLocatedStatus(p);
    }

    public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path p) throws IOException {
        return Hadoop3CompaUtil.RemoteIterators.remoteIteratorFromIterator(this.filteredStatusStream(p).iterator());
    }

    public RemoteIterator<FileStatus> listStatusIterator(Path p) throws IOException {
        return Hadoop3CompaUtil.RemoteIterators.remoteIteratorFromIterator(this.filteredStatusStream(p).map(FileStatus.class::cast).iterator());
    }

    private Stream<LocatedFileStatus> filteredStatusStream(Path p) throws IOException {
        LocatedFileStatus[] ret = this._listLocatedStatus(p);
        Optional<FileSegments.SourceFilePredicate> filter = FileSegments.getFileSegFilterLocally(p.toString());
        if (filter.isPresent()) {
            return Arrays.stream(ret).filter(f -> ((FileSegments.SourceFilePredicate)filter.get()).checkFile((LocatedFileStatus)f));
        }
        return Arrays.stream(ret);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private LocatedFileStatus[] _listLocatedStatus(Path p) throws IOException {
        LocatedFileStatus[] result;
        if (this.useFileStatusCache) {
            p = this.fs.makeQualified(p);
            this.statistics.incrementReadOps(1);
            long requiredCacheTime = this.getAcceptCacheTime();
            this.updateDataFetchTimeInContext(p, requiredCacheTime);
            this.mStatusCache.ensureCacheCreateTime(p, requiredCacheTime);
            result = this.mStatusCache.listChildren(p);
        } else {
            StopWatch w = StopWatch.createStarted();
            try {
                List<LocatedFileStatus> list = Hadoop3CompaUtil.RemoteIterators.toList(this.fs.listLocatedStatus(p));
                result = list.toArray(new LocatedFileStatus[0]);
            }
            catch (Throwable throwable) {
                log.warn("Slow fs operation took {} ms: {}({})", new Object[]{w.getTime(), "listLocatedStatus", p});
                throw throwable;
            }
            log.warn("Slow fs operation took {} ms: {}({})", new Object[]{w.getTime(), "listLocatedStatus", p});
        }
        FileSegmentsDetector.onObserveFileSystemListFileStatus(p, result);
        return result;
    }

    private void updateDataFetchTimeInContext(Path p, long requiredCacheTime) {
        QueryContext context = QueryContext.current();
        if (context == null) {
            return;
        }
        long lastFetchTime = this.lastFetchTimeOfPaths.getOrDefault(p, 0L);
        if ((lastFetchTime = Long.max(lastFetchTime, requiredCacheTime)) == 0L) {
            lastFetchTime = System.currentTimeMillis();
        }
        context.getMetrics().addDataFetchTime(lastFetchTime);
        this.lastFetchTimeOfPaths.put(p, lastFetchTime);
    }

    public long countCachedFilePages() {
        return this.mCacheManager.countTotalCachedPages();
    }

    public long countCachedFileStatus() {
        return this.mStatusCache.size();
    }

    public long countCachedFileStatusEvictions() {
        return this.mStatusCache.countEvictions();
    }
}

