/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory;

import java.io.IOException;
import java.util.Arrays;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory.MemoryTierSubpartitionProducerAgent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

public class MemoryTierProducerAgent
implements TierProducerAgent,
NettyServiceProducer {
    private final int numBuffersPerSegment;
    private final int subpartitionMaxQueuedBuffers;
    private final TieredStorageMemoryManager memoryManager;
    private final int[] currentSubpartitionWriteBuffers;
    private final boolean[] nettyConnectionEstablished;
    private final MemoryTierSubpartitionProducerAgent[] subpartitionProducerAgents;
    private final BufferCompressor bufferCompressor;

    public MemoryTierProducerAgent(TieredStoragePartitionId partitionId, int numSubpartitions, int bufferSizeBytes, int segmentSizeBytes, int subpartitionMaxQueuedBuffers, boolean isBroadcastOnly, TieredStorageMemoryManager memoryManager, TieredStorageNettyService nettyService, TieredStorageResourceRegistry resourceRegistry, BufferCompressor bufferCompressor) {
        Preconditions.checkArgument((segmentSizeBytes >= bufferSizeBytes ? 1 : 0) != 0, (Object)"One segment should contain at least one buffer.");
        Preconditions.checkArgument((!isBroadcastOnly ? 1 : 0) != 0, (Object)"Broadcast only partition is not allowed to use the memory tier.");
        this.numBuffersPerSegment = segmentSizeBytes / bufferSizeBytes;
        this.subpartitionMaxQueuedBuffers = subpartitionMaxQueuedBuffers;
        this.memoryManager = memoryManager;
        this.currentSubpartitionWriteBuffers = new int[numSubpartitions];
        this.nettyConnectionEstablished = new boolean[numSubpartitions];
        this.subpartitionProducerAgents = new MemoryTierSubpartitionProducerAgent[numSubpartitions];
        this.bufferCompressor = bufferCompressor;
        Arrays.fill(this.currentSubpartitionWriteBuffers, 0);
        nettyService.registerProducer(partitionId, this);
        for (int subpartitionId = 0; subpartitionId < numSubpartitions; ++subpartitionId) {
            this.subpartitionProducerAgents[subpartitionId] = new MemoryTierSubpartitionProducerAgent(subpartitionId);
        }
        resourceRegistry.registerResource(partitionId, this::releaseResources);
    }

    @Override
    public boolean tryStartNewSegment(TieredStorageSubpartitionId subpartitionId, int segmentId, int minNumBuffers) {
        boolean canStartNewSegment;
        boolean bl = canStartNewSegment = this.nettyConnectionEstablished[subpartitionId.getSubpartitionId()] && this.subpartitionProducerAgents[subpartitionId.getSubpartitionId()].numQueuedBuffers() < this.subpartitionMaxQueuedBuffers && this.memoryManager.getMaxNonReclaimableBuffers(TieredStorageUtils.getMemoryTierName()) - this.memoryManager.numOwnerRequestedBuffer(TieredStorageUtils.getMemoryTierName()) > Math.max(this.numBuffersPerSegment, minNumBuffers) && this.memoryManager.ensureCapacity(Math.max(this.numBuffersPerSegment, minNumBuffers));
        if (canStartNewSegment) {
            this.subpartitionProducerAgents[subpartitionId.getSubpartitionId()].updateSegmentId(segmentId);
        }
        return canStartNewSegment;
    }

    @Override
    public boolean tryWrite(TieredStorageSubpartitionId subpartitionId, Buffer finishedBuffer, Object bufferOwner, int numRemainingConsecutiveBuffers) {
        int subpartitionIndex = subpartitionId.getSubpartitionId();
        if (this.currentSubpartitionWriteBuffers[subpartitionIndex] != 0 && this.currentSubpartitionWriteBuffers[subpartitionIndex] + 1 + numRemainingConsecutiveBuffers > this.numBuffersPerSegment) {
            this.appendEndOfSegmentEvent(subpartitionIndex);
            this.currentSubpartitionWriteBuffers[subpartitionIndex] = 0;
            return false;
        }
        Buffer compressedBuffer = TieredStorageUtils.compressBufferIfPossible(finishedBuffer, this.bufferCompressor);
        if (compressedBuffer.isBuffer()) {
            this.memoryManager.transferBufferOwnership(bufferOwner, TieredStorageUtils.getMemoryTierName(), compressedBuffer);
        }
        int n = subpartitionIndex;
        this.currentSubpartitionWriteBuffers[n] = this.currentSubpartitionWriteBuffers[n] + 1;
        this.addFinishedBuffer(compressedBuffer, subpartitionIndex);
        return true;
    }

    @Override
    public void connectionEstablished(TieredStorageSubpartitionId subpartitionId, NettyConnectionWriter nettyConnectionWriter) {
        this.subpartitionProducerAgents[subpartitionId.getSubpartitionId()].connectionEstablished(nettyConnectionWriter);
        this.nettyConnectionEstablished[subpartitionId.getSubpartitionId()] = true;
    }

    @Override
    public void connectionBroken(NettyConnectionId connectionId) {
    }

    @Override
    public void close() {
    }

    private void releaseResources() {
        Arrays.stream(this.subpartitionProducerAgents).forEach(MemoryTierSubpartitionProducerAgent::release);
    }

    private void appendEndOfSegmentEvent(int subpartitionId) {
        try {
            MemorySegment memorySegment = MemorySegmentFactory.wrap((byte[])EventSerializer.toSerializedEvent(EndOfSegmentEvent.INSTANCE).array());
            this.addFinishedBuffer(new NetworkBuffer(memorySegment, FreeingBufferRecycler.INSTANCE, Buffer.DataType.END_OF_SEGMENT, memorySegment.size()), subpartitionId);
        }
        catch (IOException e) {
            ExceptionUtils.rethrow((Throwable)e, (String)"Failed to append end of segment event,");
        }
    }

    private void addFinishedBuffer(Buffer finishedBuffer, int subpartitionId) {
        this.subpartitionProducerAgents[subpartitionId].addFinishedBuffer(finishedBuffer);
    }
}

