/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.impl.cache;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.bookkeeper.mledger.impl.cache.InflightReadsLimiter;
import org.apache.bookkeeper.mledger.impl.cache.PendingReadsManager;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
import org.apache.bookkeeper.mledger.impl.cache.ReadEntryUtils;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.util.RangeCache;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RangeEntryCacheImpl
implements EntryCache {
    public static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64L;
    public static final int DEFAULT_ESTIMATED_ENTRY_SIZE = 10240;
    private static final boolean DEFAULT_CACHE_INDIVIDUAL_READ_ENTRY = false;
    private final RangeEntryCacheManagerImpl manager;
    final ManagedLedgerImpl ml;
    private ManagedLedgerInterceptor interceptor;
    private final RangeCache<Position, EntryImpl> entries;
    private final boolean copyEntries;
    private final PendingReadsManager pendingReadsManager;
    private static final double MB = 1048576.0;
    private final LongAdder totalAddedEntriesSize = new LongAdder();
    private final LongAdder totalAddedEntriesCount = new LongAdder();
    public static final PooledByteBufAllocator ALLOCATOR = new PooledByteBufAllocator(true, 0, PooledByteBufAllocator.defaultNumDirectArena(), PooledByteBufAllocator.defaultPageSize(), PooledByteBufAllocator.defaultMaxOrder(), PooledByteBufAllocator.defaultSmallCacheSize(), PooledByteBufAllocator.defaultNormalCacheSize(), true);
    private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheImpl.class);

    public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) {
        this.manager = manager;
        this.ml = ml;
        this.pendingReadsManager = new PendingReadsManager(this);
        this.interceptor = ml.getManagedLedgerInterceptor();
        this.entries = new RangeCache(EntryImpl::getLength, EntryImpl::getTimestamp);
        this.copyEntries = copyEntries;
        if (log.isDebugEnabled()) {
            log.debug("[{}] Initialized managed-ledger entry cache", (Object)ml.getName());
        }
    }

    @VisibleForTesting
    ManagedLedgerImpl getManagedLedger() {
        return this.ml;
    }

    @VisibleForTesting
    ManagedLedgerConfig getManagedLedgerConfig() {
        return this.ml.getConfig();
    }

    @Override
    public String getName() {
        return this.ml.getName();
    }

    @VisibleForTesting
    public InflightReadsLimiter getPendingReadsLimiter() {
        return this.manager.getInflightReadsLimiter();
    }

    @Override
    public boolean insert(EntryImpl entry) {
        ByteBuf cachedData;
        Position position;
        int entryLength = entry.getLength();
        if (!this.manager.hasSpaceInCache()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Skipping cache while doing eviction: {} - size: {}", new Object[]{this.ml.getName(), entry.getPosition(), entryLength});
            }
            return false;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Adding entry to cache: {} - size: {}", new Object[]{this.ml.getName(), entry.getPosition(), entryLength});
        }
        if (this.entries.exists(position = entry.getPosition())) {
            return false;
        }
        if (this.copyEntries) {
            cachedData = this.copyEntry(entry);
            if (cachedData == null) {
                return false;
            }
        } else {
            cachedData = entry.getDataBuffer().retain();
        }
        EntryImpl cacheEntry = EntryImpl.create(position, cachedData);
        cachedData.release();
        if (this.entries.put(position, cacheEntry)) {
            this.totalAddedEntriesSize.add(entryLength);
            this.totalAddedEntriesCount.increment();
            this.manager.entryAdded(entryLength);
            return true;
        }
        cacheEntry.release();
        return false;
    }

    private ByteBuf copyEntry(EntryImpl entry) {
        int size = entry.getLength();
        ByteBuf cachedData = null;
        try {
            cachedData = ALLOCATOR.directBuffer(size, size);
        }
        catch (Throwable t) {
            log.warn("[{}] Failed to allocate buffer for entry cache: {}", (Object)this.ml.getName(), (Object)t.getMessage());
            return null;
        }
        if (size > 0) {
            ByteBuf entryBuf = entry.getDataBuffer();
            int readerIdx = entryBuf.readerIndex();
            cachedData.writeBytes(entryBuf);
            entryBuf.readerIndex(readerIdx);
        }
        return cachedData;
    }

    @Override
    public void invalidateEntries(Position lastPosition) {
        Position firstPosition = PositionFactory.create(-1L, 0L);
        if (firstPosition.compareTo(lastPosition) > 0) {
            if (log.isDebugEnabled()) {
                log.debug("Attempted to invalidate entries in an invalid range : {} ~ {}", (Object)firstPosition, (Object)lastPosition);
            }
            return;
        }
        Pair<Integer, Long> removed = this.entries.removeRange(firstPosition, lastPosition, false);
        int entriesRemoved = (Integer)removed.getLeft();
        long sizeRemoved = (Long)removed.getRight();
        if (log.isTraceEnabled()) {
            log.trace("[{}] Invalidated entries up to {} - Entries removed: {} - Size removed: {}", new Object[]{this.ml.getName(), lastPosition, entriesRemoved, sizeRemoved});
        }
        this.manager.entriesRemoved(sizeRemoved, entriesRemoved);
    }

    @Override
    public void invalidateAllEntries(long ledgerId) {
        Position firstPosition = PositionFactory.create(ledgerId, 0L);
        Position lastPosition = PositionFactory.create(ledgerId + 1L, 0L);
        Pair<Integer, Long> removed = this.entries.removeRange(firstPosition, lastPosition, false);
        int entriesRemoved = (Integer)removed.getLeft();
        long sizeRemoved = (Long)removed.getRight();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Invalidated all entries on ledger {} - Entries removed: {} - Size removed: {}", new Object[]{this.ml.getName(), ledgerId, entriesRemoved, sizeRemoved});
        }
        this.manager.entriesRemoved(sizeRemoved, entriesRemoved);
        this.pendingReadsManager.invalidateLedger(ledgerId);
    }

    @Override
    public void asyncReadEntry(ReadHandle lh, Position position, final AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
        try {
            this.asyncReadEntriesByPosition(lh, position, position, 1, false, new AsyncCallbacks.ReadEntriesCallback(){

                @Override
                public void readEntriesComplete(List<Entry> entries, Object ctx) {
                    if (entries.isEmpty()) {
                        callback.readEntryFailed(new ManagedLedgerException("Could not read given position"), ctx);
                    } else {
                        callback.readEntryComplete(entries.get(0), ctx);
                    }
                }

                @Override
                public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                    callback.readEntryFailed(exception, ctx);
                }
            }, ctx, true);
        }
        catch (Throwable t) {
            log.warn("failed to read entries for {}-{}", new Object[]{lh.getId(), position, t});
            this.invalidateAllEntries(lh.getId());
            callback.readEntryFailed(ManagedLedgerImpl.createManagedLedgerException(t), ctx);
        }
    }

    @Override
    public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
        try {
            this.asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, true);
        }
        catch (Throwable t) {
            log.warn("failed to read entries for {}--{}-{}", new Object[]{lh.getId(), firstEntry, lastEntry, t});
            this.invalidateAllEntries(lh.getId());
            callback.readEntriesFailed(ManagedLedgerImpl.createManagedLedgerException(t), ctx);
        }
    }

    void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, AsyncCallbacks.ReadEntriesCallback callback, Object ctx, boolean acquirePermits) {
        long ledgerId = lh.getId();
        int numberOfEntries = (int)(lastEntry - firstEntry) + 1;
        Position firstPosition = PositionFactory.create(ledgerId, firstEntry);
        Position lastPosition = PositionFactory.create(ledgerId, lastEntry);
        this.asyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry, callback, ctx, acquirePermits);
    }

    void asyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Position lastPosition, int numberOfEntries, boolean shouldCacheEntry, AsyncCallbacks.ReadEntriesCallback originalCallback, Object ctx, boolean acquirePermits) {
        Preconditions.checkArgument((firstPosition.getLedgerId() == lastPosition.getLedgerId() ? 1 : 0) != 0, (String)"Invalid range. Entries %s and %s should be in the same ledger.", (Object)firstPosition, (Object)lastPosition);
        Preconditions.checkArgument((firstPosition.getLedgerId() == lh.getId() ? 1 : 0) != 0, (String)"Invalid ReadHandle. The ledger %s of the range positions should match the handle's ledger %s.", (long)firstPosition.getLedgerId(), (long)lh.getId());
        if (log.isDebugEnabled()) {
            log.debug("[{}] Reading {} entries in range {} to {}", new Object[]{this.ml.getName(), numberOfEntries, firstPosition, lastPosition});
        }
        InflightReadsLimiter pendingReadsLimiter = this.getPendingReadsLimiter();
        if (!acquirePermits || pendingReadsLimiter.isDisabled()) {
            this.doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry, originalCallback, ctx);
        } else {
            Optional<InflightReadsLimiter.Handle> optionalHandle;
            long estimatedEntrySize = this.getEstimatedEntrySize(lh);
            long estimatedReadSize = (long)numberOfEntries * estimatedEntrySize;
            if (log.isDebugEnabled()) {
                log.debug("Estimated read size: {} bytes for {} entries with {} estimated entry size", new Object[]{estimatedReadSize, numberOfEntries, estimatedEntrySize});
            }
            if ((optionalHandle = pendingReadsLimiter.acquire(estimatedReadSize, handle -> this.ml.getExecutor().execute(() -> this.doAsyncReadEntriesWithAcquiredPermits(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry, originalCallback, ctx, (InflightReadsLimiter.Handle)handle, estimatedReadSize)))).isPresent()) {
                this.doAsyncReadEntriesWithAcquiredPermits(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry, originalCallback, ctx, optionalHandle.get(), estimatedReadSize);
            }
        }
    }

    void doAsyncReadEntriesWithAcquiredPermits(ReadHandle lh, Position firstPosition, Position lastPosition, int numberOfEntries, boolean shouldCacheEntry, final AsyncCallbacks.ReadEntriesCallback originalCallback, Object ctx, final InflightReadsLimiter.Handle handle, long estimatedReadSize) {
        if (!handle.success()) {
            String message = String.format("Couldn't acquire enough permits on the max reads in flight limiter to read from ledger %d, %s, estimated read size %d bytes for %d entries (check managedLedgerMaxReadsInFlightSizeInMB, managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis and managedLedgerMaxReadsInFlightPermitsAcquireQueueSize)", lh.getId(), this.getName(), estimatedReadSize, numberOfEntries);
            log.error(message);
            originalCallback.readEntriesFailed(new ManagedLedgerException.TooManyRequestsException(message), ctx);
            return;
        }
        final InflightReadsLimiter pendingReadsLimiter = this.getPendingReadsLimiter();
        AsyncCallbacks.ReadEntriesCallback wrappedCallback = new AsyncCallbacks.ReadEntriesCallback(){

            @Override
            public void readEntriesComplete(List<Entry> entries, Object ctx2) {
                if (!entries.isEmpty()) {
                    AtomicInteger remainingCount = new AtomicInteger(entries.size());
                    for (Entry entry : entries) {
                        ((EntryImpl)entry).onDeallocate(() -> {
                            if (remainingCount.decrementAndGet() <= 0) {
                                pendingReadsLimiter.release(handle);
                            }
                        });
                    }
                } else {
                    pendingReadsLimiter.release(handle);
                }
                originalCallback.readEntriesComplete(entries, ctx2);
            }

            @Override
            public void readEntriesFailed(ManagedLedgerException exception, Object ctx2) {
                pendingReadsLimiter.release(handle);
                originalCallback.readEntriesFailed(exception, ctx2);
            }
        };
        this.doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry, wrappedCallback, ctx);
    }

    void doAsyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Position lastPosition, int numberOfEntries, boolean shouldCacheEntry, AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
        EntryImpl cachedEntry;
        Collection<Object> cachedEntries = firstPosition.compareTo(lastPosition) == 0 ? ((cachedEntry = this.entries.get(firstPosition)) == null ? Collections.emptyList() : Collections.singleton(cachedEntry)) : this.entries.getRange(firstPosition, lastPosition);
        if (cachedEntries.size() == numberOfEntries) {
            long totalCachedSize = 0L;
            ArrayList<Entry> entriesToReturn = new ArrayList<Entry>(numberOfEntries);
            for (EntryImpl entry2 : cachedEntries) {
                entriesToReturn.add(EntryImpl.create(entry2));
                totalCachedSize += (long)entry2.getLength();
                entry2.release();
            }
            this.manager.mlFactoryMBean.recordCacheHits(entriesToReturn.size(), totalCachedSize);
            if (log.isDebugEnabled()) {
                log.debug("[{}] Cache hit for {} entries in range {} to {}", new Object[]{this.ml.getName(), numberOfEntries, firstPosition, lastPosition});
            }
            callback.readEntriesComplete(entriesToReturn, ctx);
        } else {
            if (!cachedEntries.isEmpty()) {
                cachedEntries.forEach(entry -> entry.release());
            }
            this.pendingReadsManager.readEntries(lh, firstPosition.getEntryId(), lastPosition.getEntryId(), shouldCacheEntry, callback, ctx);
        }
    }

    @VisibleForTesting
    public long getEstimatedEntrySize(ReadHandle lh) {
        if (lh.getLength() == 0L || lh.getLastAddConfirmed() < 0L) {
            return Math.max(this.getAvgEntrySize(), 10240L) + 64L;
        }
        return Math.max(1L, lh.getLength() / (lh.getLastAddConfirmed() + 1L)) + 64L;
    }

    private long getAvgEntrySize() {
        long totalAddedEntriesCount = this.totalAddedEntriesCount.sum();
        long totalAddedEntriesSize = this.totalAddedEntriesSize.sum();
        return totalAddedEntriesCount != 0L ? totalAddedEntriesSize / totalAddedEntriesCount : 0L;
    }

    CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry) {
        int entriesToRead = (int)(lastEntry - firstEntry) + 1;
        CompletionStage readResult = ReadEntryUtils.readAsync(this.ml, lh, firstEntry, lastEntry).thenApply(ledgerEntries -> {
            Objects.requireNonNull(this.ml.getName());
            Objects.requireNonNull(this.ml.getExecutor());
            try {
                long totalSize = 0L;
                ArrayList<EntryImpl> entriesToReturn = new ArrayList<EntryImpl>(entriesToRead);
                for (LedgerEntry e : ledgerEntries) {
                    EntryImpl entry = RangeEntryCacheManagerImpl.create(e, this.interceptor);
                    entriesToReturn.add(entry);
                    totalSize += (long)entry.getLength();
                    if (!shouldCacheEntry) continue;
                    EntryImpl cacheEntry = EntryImpl.create(entry);
                    this.insert(cacheEntry);
                    cacheEntry.release();
                }
                this.ml.getMbean().recordReadEntriesOpsCacheMisses(entriesToReturn.size(), totalSize);
                this.manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
                this.ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
                ArrayList<EntryImpl> arrayList = entriesToReturn;
                return arrayList;
            }
            finally {
                ledgerEntries.close();
            }
        });
        ((CompletableFuture)readResult).exceptionally(exception -> {
            if (!(exception instanceof BKException) || ((BKException)exception).getCode() != -105) {
                this.ml.invalidateLedgerHandle(lh);
                this.pendingReadsManager.invalidateLedger(lh.getId());
            }
            return null;
        });
        return readResult;
    }

    @Override
    public void clear() {
        Pair<Integer, Long> removedPair = this.entries.clear();
        this.manager.entriesRemoved((Long)removedPair.getRight(), (Integer)removedPair.getLeft());
        this.pendingReadsManager.clear();
    }

    @Override
    public long getSize() {
        return this.entries.getSize();
    }

    @Override
    public int compareTo(EntryCache other) {
        return Long.compare(this.getSize(), other.getSize());
    }

    @Override
    public Pair<Integer, Long> evictEntries(long sizeToFree) {
        Preconditions.checkArgument((sizeToFree > 0L ? 1 : 0) != 0);
        Pair<Integer, Long> evicted = this.entries.evictLeastAccessedEntries(sizeToFree);
        int evictedEntries = (Integer)evicted.getLeft();
        long evictedSize = (Long)evicted.getRight();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Doing cache eviction of at least {} Mb -- Deleted {} entries - Total size deleted: {} Mb  -- Current Size: {} Mb", new Object[]{this.ml.getName(), (double)sizeToFree / 1048576.0, evictedEntries, (double)evictedSize / 1048576.0, (double)this.entries.getSize() / 1048576.0});
        }
        this.manager.entriesRemoved(evictedSize, evictedEntries);
        return evicted;
    }

    @Override
    public void invalidateEntriesBeforeTimestamp(long timestamp) {
        Pair<Integer, Long> evictedPair = this.entries.evictLEntriesBeforeTimestamp(timestamp);
        this.manager.entriesRemoved((Long)evictedPair.getRight(), (Integer)evictedPair.getLeft());
    }
}

