/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import net.jcip.annotations.GuardedBy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SubpartitionRemoteCacheManager {
    static final int EMPTY_SEGMENT_ID = -1;
    private static final Logger LOG = LoggerFactory.getLogger(SubpartitionRemoteCacheManager.class);
    private final TieredStoragePartitionId partitionId;
    private final int subpartitionId;
    private final PartitionFileWriter partitionFileWriter;
    @GuardedBy(value="allBuffers")
    private final Deque<Tuple2<Buffer, Integer>> allBuffers = new LinkedList<Tuple2<Buffer, Integer>>();
    private CompletableFuture<Void> flushCompletableFuture = FutureUtils.completedVoidFuture();
    @GuardedBy(value="allBuffers")
    private int segmentId = -1;
    private int bufferIndex;

    private static void checkSegmentIdNotEmpty(int segmentId) {
        Preconditions.checkArgument(segmentId >= 0, "Segment id must be non-negative.");
    }

    public SubpartitionRemoteCacheManager(TieredStoragePartitionId partitionId, int subpartitionId, TieredStorageMemoryManager storageMemoryManager, PartitionFileWriter partitionFileWriter) {
        this.partitionId = partitionId;
        this.subpartitionId = subpartitionId;
        this.partitionFileWriter = partitionFileWriter;
        storageMemoryManager.listenBufferReclaimRequest(this::flushBuffers);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean hasAnySegmentStarted() {
        Deque<Tuple2<Buffer, Integer>> deque = this.allBuffers;
        synchronized (deque) {
            return this.segmentId != -1;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void startSegment(int segmentId) {
        Deque<Tuple2<Buffer, Integer>> deque = this.allBuffers;
        synchronized (deque) {
            SubpartitionRemoteCacheManager.checkSegmentIdNotEmpty(segmentId);
            Preconditions.checkState(this.allBuffers.isEmpty(), "There are un-flushed buffers.");
            this.segmentId = segmentId;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addBuffer(Buffer buffer) {
        Tuple2<Buffer, Integer> toAddBuffer = new Tuple2<Buffer, Integer>(buffer, this.bufferIndex++);
        Deque<Tuple2<Buffer, Integer>> deque = this.allBuffers;
        synchronized (deque) {
            Preconditions.checkArgument(this.segmentId >= 0, "No segment has been started.");
            this.allBuffers.add(toAddBuffer);
        }
    }

    void finishSegment(int segmentId) {
        SubpartitionRemoteCacheManager.checkSegmentIdNotEmpty(segmentId);
        Preconditions.checkState(this.segmentId == segmentId, "Wrong segment id.");
        this.flushBuffers();
        PartitionFileWriter.SubpartitionBufferContext bufferContext = new PartitionFileWriter.SubpartitionBufferContext(this.subpartitionId, Collections.singletonList(new PartitionFileWriter.SegmentBufferContext(segmentId, Collections.emptyList(), true)));
        this.flushCompletableFuture = this.partitionFileWriter.write(this.partitionId, Collections.singletonList(bufferContext));
        Preconditions.checkState(this.allBuffers.isEmpty(), "Leaking buffers.");
    }

    void close() {
        try {
            this.flushCompletableFuture.get();
        }
        catch (Exception e) {
            LOG.error("Failed to flush the buffers.", (Throwable)e);
            ExceptionUtils.rethrow(e);
        }
        this.flushBuffers();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void release() {
        Deque<Tuple2<Buffer, Integer>> deque = this.allBuffers;
        synchronized (deque) {
            Preconditions.checkState(this.allBuffers.isEmpty(), "Leaking buffers.");
        }
        this.partitionFileWriter.release();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flushBuffers() {
        Deque<Tuple2<Buffer, Integer>> deque = this.allBuffers;
        synchronized (deque) {
            ArrayList<Tuple2<Buffer, Integer>> allBuffersToFlush = new ArrayList<Tuple2<Buffer, Integer>>(this.allBuffers);
            this.allBuffers.clear();
            if (allBuffersToFlush.isEmpty()) {
                return;
            }
            SubpartitionRemoteCacheManager.checkSegmentIdNotEmpty(this.segmentId);
            PartitionFileWriter.SubpartitionBufferContext subpartitionBufferContext = new PartitionFileWriter.SubpartitionBufferContext(this.subpartitionId, Collections.singletonList(new PartitionFileWriter.SegmentBufferContext(this.segmentId, allBuffersToFlush, false)));
            this.flushCompletableFuture = this.partitionFileWriter.write(this.partitionId, Collections.singletonList(subpartitionBufferContext));
        }
    }
}

