/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.plugins.blob;

import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.cache.Weigher;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
import org.apache.jackrabbit.oak.commons.StringUtils;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheStatsMBean;
import org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUpgradeUtils;
import org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUtils;
import org.apache.jackrabbit.oak.plugins.blob.FileCache;
import org.apache.jackrabbit.oak.plugins.blob.StagingCacheStats;
import org.apache.jackrabbit.oak.plugins.blob.StagingUploader;
import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.apache.jackrabbit.oak.stats.TimerStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UploadStagingCache
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(UploadStagingCache.class);
    protected static final String UPLOAD_STAGING_DIR = "upload";
    private final Weigher<String, File> memWeigher = new Weigher<String, File>(){

        public int weigh(String key, File value) {
            return StringUtils.estimateMemoryUsage((String)key) + StringUtils.estimateMemoryUsage((String)value.getAbsolutePath()) + 48;
        }
    };
    private long size;
    private AtomicLong currentSize;
    private ListeningExecutorService executor;
    private ScheduledExecutorService scheduledExecutor;
    private ConcurrentMap<String, File> map;
    private ConcurrentMap<String, File> attic;
    private File uploadCacheSpace;
    private StagingUploader uploader;
    private StagingCacheStats cacheStats;
    @Nullable
    private FileCache downloadCache;
    private ScheduledExecutorService statsExecutor;
    private LinkedBlockingQueue<String> retryQueue;

    private UploadStagingCache(File dir, File home, int uploadThreads, long size, StagingUploader uploader, @Nullable FileCache cache, StatisticsProvider statisticsProvider, @Nullable ListeningExecutorService executor, @Nullable ScheduledExecutorService scheduledExecutor, int purgeInterval, int retryInterval) {
        this.currentSize = new AtomicLong();
        this.size = size;
        this.executor = executor;
        if (executor == null) {
            this.executor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(uploadThreads, (ThreadFactory)new NamedThreadFactory("oak-ds-async-upload-thread")));
        }
        this.scheduledExecutor = scheduledExecutor;
        if (scheduledExecutor == null) {
            this.scheduledExecutor = Executors.newScheduledThreadPool(2, (ThreadFactory)new NamedThreadFactory("oak-ds-cache-scheduled-thread"));
        }
        this.map = Maps.newConcurrentMap();
        this.attic = Maps.newConcurrentMap();
        this.retryQueue = new LinkedBlockingQueue();
        this.uploadCacheSpace = new File(dir, UPLOAD_STAGING_DIR);
        this.uploader = uploader;
        if (statisticsProvider == null) {
            this.statsExecutor = Executors.newSingleThreadScheduledExecutor();
            statisticsProvider = new DefaultStatisticsProvider(this.statsExecutor);
        }
        this.cacheStats = new StagingCacheStats(this, statisticsProvider, size);
        this.downloadCache = cache;
        this.build(home, dir);
        this.scheduledExecutor.scheduleAtFixedRate(new RemoveJob(), purgeInterval, purgeInterval, TimeUnit.SECONDS);
        this.scheduledExecutor.scheduleAtFixedRate(new RetryJob(), retryInterval, retryInterval, TimeUnit.SECONDS);
    }

    private UploadStagingCache() {
    }

    public static UploadStagingCache build(File dir, File home, int uploadThreads, long size, StagingUploader uploader, @Nullable FileCache cache, StatisticsProvider statisticsProvider, @Nullable ListeningExecutorService executor, @Nullable ScheduledExecutorService scheduledExecutor, int purgeInterval, int retryInterval) {
        if (size > 0L) {
            return new UploadStagingCache(dir, home, uploadThreads, size, uploader, cache, statisticsProvider, executor, scheduledExecutor, purgeInterval, retryInterval);
        }
        return new UploadStagingCache(){

            @Override
            public Optional<SettableFuture<Integer>> put(String id, File input) {
                return Optional.absent();
            }

            @Override
            protected void invalidate(String key) {
            }

            @Override
            protected Iterator<String> getAllIdentifiers() {
                return Iterators.emptyIterator();
            }

            @Override
            @Nullable
            public File getIfPresent(String key) {
                return null;
            }

            @Override
            public DataStoreCacheStatsMBean getStats() {
                return new StagingCacheStats(this, StatisticsProvider.NOOP, 0L);
            }

            @Override
            public void close() {
            }
        };
    }

    private void build(File home, File rootPath) {
        LOG.info("Scheduling pending uploads");
        DataStoreCacheUpgradeUtils.movePendingUploadsToStaging(home, rootPath, true);
        Iterator iter = Files.fileTreeTraverser().postOrderTraversal((Object)this.uploadCacheSpace).filter((Predicate)new Predicate<File>(){

            public boolean apply(File input) {
                return input.isFile();
            }
        }).iterator();
        int count = 0;
        while (iter.hasNext()) {
            File toBeSyncedFile = (File)iter.next();
            Optional<SettableFuture<Integer>> scheduled = this.putOptionalDisregardingSize(toBeSyncedFile.getName(), toBeSyncedFile, true);
            if (scheduled.isPresent()) {
                ++count;
                continue;
            }
            LOG.info("File [{}] not setup for upload", (Object)toBeSyncedFile.getName());
        }
        LOG.info("Scheduled [{}] pending uploads", (Object)count);
    }

    public Optional<SettableFuture<Integer>> put(String id, File input) {
        return this.putOptionalDisregardingSize(id, input, false);
    }

    private Optional<SettableFuture<Integer>> putOptionalDisregardingSize(String id, File input, boolean ignoreSize) {
        this.cacheStats.markRequest();
        long length = input.length();
        File uploadFile = DataStoreCacheUtils.getFile(id, this.uploadCacheSpace);
        if ((ignoreSize && this.currentSize.addAndGet(length) >= 0L || this.currentSize.addAndGet(length) <= this.size) && !this.attic.containsKey(id) && this.map.putIfAbsent(id, uploadFile) == null) {
            try {
                if (!uploadFile.exists()) {
                    FileUtils.moveFile((File)input, (File)uploadFile);
                    LOG.trace("File [{}] moved to staging cache [{}]", (Object)input, (Object)uploadFile);
                }
                this.cacheStats.markHit();
                this.cacheStats.incrementCount();
                this.cacheStats.incrementSize(length);
                this.cacheStats.incrementMemSize(this.memWeigher.weigh((Object)id, (Object)uploadFile));
                return Optional.of(this.stage(id, uploadFile));
            }
            catch (Exception e) {
                LOG.info("Error moving file to staging", (Throwable)e);
                this.currentSize.addAndGet(-length);
                this.map.remove(id, uploadFile);
            }
        } else {
            this.currentSize.addAndGet(-length);
            if (this.map.containsKey(id) || this.attic.containsKey(id)) {
                SettableFuture result = SettableFuture.create();
                result.set((Object)0);
                return Optional.of((Object)result);
            }
        }
        return Optional.absent();
    }

    private SettableFuture<Integer> stage(final String id, final File upload) {
        final SettableFuture result = SettableFuture.create();
        try {
            ListenableFuture future = this.executor.submit((Callable)new Callable<Integer>(){

                @Override
                public Integer call() throws Exception {
                    try {
                        TimerStats.Context uploadContext = UploadStagingCache.this.cacheStats.startUpLoaderTimer();
                        UploadStagingCache.this.uploader.write(id, upload);
                        LOG.debug("File added to backend [{}]", (Object)upload);
                        uploadContext.stop();
                        return 1;
                    }
                    catch (Exception e) {
                        LOG.error("Error adding file to backend", (Throwable)e);
                        throw e;
                    }
                }
            });
            Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<Integer>(){

                public void onSuccess(@Nullable Integer r) {
                    LOG.info("Successfully added [{}], [{}]", (Object)id, (Object)upload);
                    try {
                        UploadStagingCache.this.attic.put(id, upload);
                        if (UploadStagingCache.this.downloadCache != null) {
                            Files.touch((File)upload);
                            UploadStagingCache.this.downloadCache.put(id, upload);
                            LOG.debug("[{}] added to cache", (Object)id);
                        }
                        UploadStagingCache.this.map.remove(id);
                    }
                    catch (IOException e) {
                        LOG.warn("Error in cleaning up [{}] from staging", (Object)upload);
                    }
                    result.set((Object)r);
                }

                public void onFailure(Throwable t) {
                    LOG.error("Error adding [{}] with file [{}] to backend", new Object[]{id, upload, t});
                    result.setException(t);
                    UploadStagingCache.this.retryQueue.add(id);
                }
            });
            LOG.debug("File [{}] scheduled for upload [{}]", (Object)upload, (Object)result);
        }
        catch (Exception e) {
            LOG.error("Error staging file for upload [{}]", (Object)upload, (Object)e);
        }
        return result;
    }

    protected void invalidate(String key) {
        if (!this.attic.containsKey(key) && this.map.containsKey(key)) {
            try {
                LOG.debug("Invalidating [{}]", (Object)key);
                File toBeDeleted = (File)this.map.get(key);
                this.deleteInternal(key, toBeDeleted);
                this.map.remove(key, toBeDeleted);
            }
            catch (IOException e) {
                LOG.warn("Could not delete file from staging", (Throwable)e);
            }
        }
    }

    protected Iterator<String> getAllIdentifiers() {
        return this.map.keySet().iterator();
    }

    private void remove() {
        LOG.info("Starting purge of uploaded files");
        Iterator iterator = this.attic.keySet().iterator();
        int count = 0;
        while (iterator.hasNext()) {
            String key = (String)iterator.next();
            try {
                if (this.map.containsKey(key)) continue;
                LOG.trace("upload map contains id [{}]", (Object)key);
                File toBeDeleted = (File)this.attic.get(key);
                this.deleteInternal(key, toBeDeleted);
                iterator.remove();
                LOG.debug("Cache [{}] file deleted for id [{}]", (Object)toBeDeleted, (Object)key);
                ++count;
            }
            catch (IOException e) {
                LOG.error("Error in removing entry for id [{}]", (Object)key);
            }
        }
        LOG.info("Finished removal of [{}] files", (Object)count);
    }

    private void deleteInternal(String key, File toBeDeleted) throws IOException {
        LOG.debug("Trying to delete file [{}]", (Object)toBeDeleted);
        long length = toBeDeleted.length();
        DataStoreCacheUtils.recursiveDelete(toBeDeleted, this.uploadCacheSpace);
        LOG.debug("deleted file [{}]", (Object)toBeDeleted);
        this.currentSize.addAndGet(-length);
        this.cacheStats.decrementSize(length);
        this.cacheStats.decrementMemSize(this.memWeigher.weigh((Object)key, (Object)toBeDeleted));
        this.cacheStats.decrementCount();
    }

    @Nullable
    public File getIfPresent(String key) {
        this.cacheStats.markLoad();
        if (this.map.containsKey(key)) {
            this.cacheStats.markLoadSuccess();
            return (File)this.map.get(key);
        }
        return null;
    }

    public DataStoreCacheStatsMBean getStats() {
        return this.cacheStats;
    }

    @Override
    public void close() {
        LOG.info("Uploads in progress on close [{}]", (Object)this.map.size());
        LOG.info("Uploads completed but not cleared from cache [{}]", (Object)this.attic.size());
        LOG.info("Staging cache stats on close [{}]", (Object)this.cacheStats.cacheInfoAsString());
        new ExecutorCloser((ExecutorService)this.executor).close();
        new ExecutorCloser((ExecutorService)this.scheduledExecutor).close();
        new ExecutorCloser((ExecutorService)this.statsExecutor).close();
    }

    protected void setDownloadCache(@Nullable FileCache downloadCache) {
        this.downloadCache = downloadCache;
    }

    class RetryJob
    implements Runnable {
        RetryJob() {
        }

        @Override
        public void run() {
            LOG.debug("Retry job started");
            int count = 0;
            ArrayList entries = Lists.newArrayList();
            UploadStagingCache.this.retryQueue.drainTo(entries);
            for (String key : entries) {
                File file = (File)UploadStagingCache.this.map.get(key);
                LOG.info("Retrying upload of id [{}] with file [{}] ", (Object)key, (Object)file);
                UploadStagingCache.this.stage(key, file);
                ++count;
                LOG.info("Scheduled retry for upload of id [{}] with file [{}]", (Object)key, (Object)file);
            }
            LOG.debug("Retry job finished with staging [{}] jobs", (Object)count);
        }
    }

    class RemoveJob
    implements Runnable {
        RemoveJob() {
        }

        @Override
        public void run() {
            UploadStagingCache.this.remove();
        }
    }
}

