package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import net.jcip.annotations.GuardedBy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/SubpartitionRemoteCacheManager.class */
class SubpartitionRemoteCacheManager {
    private static final Logger LOG = LoggerFactory.getLogger(SubpartitionRemoteCacheManager.class);
    private final TieredStoragePartitionId partitionId;
    private final int subpartitionId;
    private final PartitionFileWriter partitionFileWriter;

    @GuardedBy("allBuffers")
    private final Deque<Tuple2<Buffer, Integer>> allBuffers = new LinkedList();
    private CompletableFuture<Void> flushCompletableFuture = FutureUtils.completedVoidFuture();

    @GuardedBy("allBuffers")
    private int segmentId = -1;
    private int bufferIndex;

    public SubpartitionRemoteCacheManager(TieredStoragePartitionId tieredStoragePartitionId, int i, TieredStorageMemoryManager tieredStorageMemoryManager, PartitionFileWriter partitionFileWriter) {
        this.partitionId = tieredStoragePartitionId;
        this.subpartitionId = i;
        this.partitionFileWriter = partitionFileWriter;
        tieredStorageMemoryManager.listenBufferReclaimRequest(this::flushBuffers);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startSegment(int i) {
        synchronized (this.allBuffers) {
            Preconditions.checkState(this.allBuffers.isEmpty(), "There are un-flushed buffers.");
            this.segmentId = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addBuffer(Buffer buffer) {
        int i = this.bufferIndex;
        this.bufferIndex = i + 1;
        Tuple2<Buffer, Integer> tuple2 = new Tuple2<>(buffer, Integer.valueOf(i));
        synchronized (this.allBuffers) {
            this.allBuffers.add(tuple2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void finishSegment(int i) {
        Preconditions.checkState(this.segmentId == i, "Wrong segment id.");
        flushBuffers();
        this.flushCompletableFuture = this.partitionFileWriter.write(this.partitionId, Collections.singletonList(new PartitionFileWriter.SubpartitionBufferContext(this.subpartitionId, Collections.singletonList(new PartitionFileWriter.SegmentBufferContext(i, Collections.emptyList(), true)))));
        Preconditions.checkState(this.allBuffers.isEmpty(), "Leaking buffers.");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        try {
            this.flushCompletableFuture.get();
        } catch (Exception e) {
            LOG.error("Failed to flush the buffers.", e);
            ExceptionUtils.rethrow(e);
        }
        flushBuffers();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void release() {
        synchronized (this.allBuffers) {
            Preconditions.checkState(this.allBuffers.isEmpty(), "Leaking buffers.");
        }
        this.partitionFileWriter.release();
    }

    private void flushBuffers() {
        synchronized (this.allBuffers) {
            ArrayList arrayList = new ArrayList(this.allBuffers);
            this.allBuffers.clear();
            if (arrayList.isEmpty()) {
                return;
            }
            this.flushCompletableFuture = this.partitionFileWriter.write(this.partitionId, Collections.singletonList(new PartitionFileWriter.SubpartitionBufferContext(this.subpartitionId, Collections.singletonList(new PartitionFileWriter.SegmentBufferContext(this.segmentId, arrayList, false)))));
        }
    }
}
