package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.DeduplicatedQueue;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageInputChannelId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.class */
public class RemoteTierConsumerAgent implements TierConsumerAgent, AvailabilityNotifier {
    private final List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs;
    private final RemoteStorageScanner remoteStorageScanner;
    private final PartitionFileReader partitionFileReader;
    private final int bufferSizeBytes;
    private AvailabilityNotifier notifier;
    private final Map<TieredStoragePartitionId, DeduplicatedQueue<TieredStorageSubpartitionId>> availableSubpartitionsQueues = new HashMap();
    private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Tuple2<Integer, Integer>>> currentBufferIndexAndSegmentIds = new HashMap();

    public RemoteTierConsumerAgent(List<TieredStorageConsumerSpec> list, RemoteStorageScanner remoteStorageScanner, PartitionFileReader partitionFileReader, int i) {
        this.tieredStorageConsumerSpecs = list;
        this.remoteStorageScanner = remoteStorageScanner;
        this.partitionFileReader = partitionFileReader;
        this.bufferSizeBytes = i;
        this.remoteStorageScanner.registerAvailabilityAndPriorityNotifier(this);
        Iterator<TieredStorageConsumerSpec> it = list.iterator();
        while (it.hasNext()) {
            this.availableSubpartitionsQueues.putIfAbsent(it.next().getPartitionId(), new DeduplicatedQueue<>());
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent
    public void start() {
        this.remoteStorageScanner.start();
        for (TieredStorageConsumerSpec tieredStorageConsumerSpec : this.tieredStorageConsumerSpecs) {
            Iterator<Integer> it = tieredStorageConsumerSpec.getSubpartitionIds().values().iterator();
            while (it.hasNext()) {
                this.remoteStorageScanner.watchSegment(tieredStorageConsumerSpec.getPartitionId(), new TieredStorageSubpartitionId(it.next().intValue()), 0);
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent
    public int peekNextBufferSubpartitionId(TieredStoragePartitionId tieredStoragePartitionId, ResultSubpartitionIndexSet resultSubpartitionIndexSet) throws IOException {
        synchronized (this.availableSubpartitionsQueues) {
            for (TieredStorageSubpartitionId tieredStorageSubpartitionId : this.availableSubpartitionsQueues.get(tieredStoragePartitionId).values()) {
                if (resultSubpartitionIndexSet.contains(tieredStorageSubpartitionId.getSubpartitionId())) {
                    return tieredStorageSubpartitionId.getSubpartitionId();
                }
            }
            return -1;
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent
    public Optional<Buffer> getNextBuffer(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageSubpartitionId tieredStorageSubpartitionId, int i) {
        Tuple2<Integer, Integer> orDefault = this.currentBufferIndexAndSegmentIds.computeIfAbsent(tieredStoragePartitionId, tieredStoragePartitionId2 -> {
            return new HashMap();
        }).getOrDefault(tieredStorageSubpartitionId, Tuple2.of(0, 0));
        int intValue = ((Integer) orDefault.f0).intValue();
        if (i != ((Integer) orDefault.f1).intValue()) {
            this.remoteStorageScanner.watchSegment(tieredStoragePartitionId, tieredStorageSubpartitionId, i);
        }
        MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(this.bufferSizeBytes);
        PartitionFileReader.ReadBufferResult readBufferResult = null;
        try {
            readBufferResult = this.partitionFileReader.readBuffer(tieredStoragePartitionId, tieredStorageSubpartitionId, i, intValue, allocateUnpooledSegment, FreeingBufferRecycler.INSTANCE, null, null);
        } catch (IOException e) {
            allocateUnpooledSegment.free();
            ExceptionUtils.rethrow(e, "Failed to read buffer from partition file.");
        }
        if (readBufferResult == null || readBufferResult.getReadBuffers().isEmpty()) {
            allocateUnpooledSegment.free();
            synchronized (this.availableSubpartitionsQueues) {
                this.availableSubpartitionsQueues.get(tieredStoragePartitionId).remove(tieredStorageSubpartitionId);
            }
            return Optional.empty();
        }
        List<Buffer> readBuffers = readBufferResult.getReadBuffers();
        Preconditions.checkState(readBuffers.size() == 1);
        Buffer buffer = readBuffers.get(0);
        this.currentBufferIndexAndSegmentIds.get(tieredStoragePartitionId).put(tieredStorageSubpartitionId, Tuple2.of(Integer.valueOf(intValue + 1), Integer.valueOf(i)));
        return Optional.of(buffer);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent
    public void registerAvailabilityNotifier(AvailabilityNotifier availabilityNotifier) {
        Preconditions.checkState(this.notifier == null);
        this.notifier = availabilityNotifier;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent
    public void close() throws IOException {
        this.remoteStorageScanner.close();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier
    public void notifyAvailable(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageSubpartitionId tieredStorageSubpartitionId) {
        synchronized (this.availableSubpartitionsQueues) {
            if (this.availableSubpartitionsQueues.get(tieredStoragePartitionId).add(tieredStorageSubpartitionId)) {
                this.notifier.notifyAvailable(tieredStoragePartitionId, tieredStorageSubpartitionId);
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier
    public void notifyAvailable(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageInputChannelId tieredStorageInputChannelId) {
        throw new UnsupportedOperationException("This method should not be invoked.");
    }
}
