package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.util.concurrent.FutureUtils;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.class */
class DiskCacheManager {
    private final TieredStoragePartitionId partitionId;
    private final int numSubpartitions;
    private final int maxCachedBytesBeforeFlush;
    private final PartitionFileWriter partitionFileWriter;
    private final SubpartitionDiskCacheManager[] subpartitionCacheManagers;
    private CompletableFuture<Void> hasFlushCompleted = FutureUtils.completedVoidFuture();
    private int numCachedBytesCounter;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DiskCacheManager(TieredStoragePartitionId tieredStoragePartitionId, int i, int i2, TieredStorageMemoryManager tieredStorageMemoryManager, PartitionFileWriter partitionFileWriter) {
        this.partitionId = tieredStoragePartitionId;
        this.numSubpartitions = i;
        this.maxCachedBytesBeforeFlush = i2;
        this.partitionFileWriter = partitionFileWriter;
        this.subpartitionCacheManagers = new SubpartitionDiskCacheManager[i];
        for (int i3 = 0; i3 < i; i3++) {
            this.subpartitionCacheManagers[i3] = new SubpartitionDiskCacheManager();
        }
        tieredStorageMemoryManager.listenBufferReclaimRequest(this::notifyFlushCachedBuffers);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startSegment(int i, int i2) {
        this.subpartitionCacheManagers[i].startSegment(i2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void append(Buffer buffer, int i, boolean z) {
        this.subpartitionCacheManagers[i].append(buffer);
        increaseNumCachedBytesAndCheckFlush(buffer.readableBytes(), z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void appendEndOfSegmentEvent(ByteBuffer byteBuffer, int i) {
        this.subpartitionCacheManagers[i].appendEndOfSegmentEvent(byteBuffer);
        increaseNumCachedBytesAndCheckFlush(byteBuffer.remaining(), true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getBufferIndex(int i) {
        return this.subpartitionCacheManagers[i].getBufferIndex();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        forceFlushCachedBuffers();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void release() {
        Arrays.stream(this.subpartitionCacheManagers).forEach((v0) -> {
            v0.release();
        });
        this.partitionFileWriter.release();
    }

    private void increaseNumCachedBytesAndCheckFlush(int i, boolean z) {
        this.numCachedBytesCounter += i;
        if (!z || this.numCachedBytesCounter <= this.maxCachedBytesBeforeFlush) {
            return;
        }
        forceFlushCachedBuffers();
    }

    private void notifyFlushCachedBuffers() {
        flushBuffers(false);
    }

    private void forceFlushCachedBuffers() {
        flushBuffers(true);
    }

    private synchronized void flushBuffers(boolean z) {
        if (z || this.hasFlushCompleted.isDone()) {
            ArrayList arrayList = new ArrayList();
            if (getSubpartitionToFlushBuffers(arrayList) > 0) {
                CompletableFuture<Void> write = this.partitionFileWriter.write(this.partitionId, arrayList);
                if (!z) {
                    this.hasFlushCompleted = write;
                }
            }
            this.numCachedBytesCounter = 0;
        }
    }

    private int getSubpartitionToFlushBuffers(List<PartitionFileWriter.SubpartitionBufferContext> list) {
        int i = 0;
        for (int i2 = 0; i2 < this.numSubpartitions; i2++) {
            List<Tuple2<Buffer, Integer>> removeAllBuffers = this.subpartitionCacheManagers[i2].removeAllBuffers();
            list.add(new PartitionFileWriter.SubpartitionBufferContext(i2, Collections.singletonList(new PartitionFileWriter.SegmentBufferContext(this.subpartitionCacheManagers[i2].getSegmentId(), removeAllBuffers, false))));
            i += removeAllBuffers.size();
        }
        return i;
    }
}
