package org.apache.flink.streaming.runtime.io.recovery;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptorUtil;
import org.apache.flink.runtime.checkpoint.RescaleMappings;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.SubtaskConnectionDescriptor;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/recovery/DemultiplexingRecordDeserializerTest.class */
class DemultiplexingRecordDeserializerTest {
    private final ThreadLocalRandom random = ThreadLocalRandom.current();
    private IOManager ioManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/recovery/DemultiplexingRecordDeserializerTest$ModSelector.class */
    public static class ModSelector implements ChannelSelector<SerializationDelegate<StreamRecord<Long>>> {
        private final int numberOfChannels;

        private ModSelector(int i) {
            this.numberOfChannels = i;
        }

        public void setup(int i) {
        }

        public int selectChannel(SerializationDelegate<StreamRecord<Long>> serializationDelegate) {
            return (int) (((Long) ((StreamRecord) serializationDelegate.getInstance()).getValue()).longValue() % this.numberOfChannels);
        }

        public boolean isBroadcast() {
            return false;
        }
    }

    DemultiplexingRecordDeserializerTest() {
    }

    @BeforeEach
    void setup() {
        this.ioManager = new IOManagerAsync();
    }

    @AfterEach
    void cleanup() {
        this.ioManager = new IOManagerAsync();
    }

    /* JADX WARN: Type inference failed for: r5v1, types: [int[], int[][]] */
    /* JADX WARN: Type inference failed for: r5v4, types: [int[], int[][]] */
    /* JADX WARN: Type inference failed for: r5v7, types: [int[], int[][]] */
    @Test
    void testUpscale() throws IOException {
        DemultiplexingRecordDeserializer<Long> create = DemultiplexingRecordDeserializer.create(new InputChannelInfo(2, 0), InflightDataRescalingDescriptorUtil.rescalingDescriptor(InflightDataRescalingDescriptorUtil.to(new int[]{0, 1}), (RescaleMappings[]) InflightDataRescalingDescriptorUtil.array(new RescaleMappings[]{InflightDataRescalingDescriptorUtil.mappings((int[][]) new int[0]), InflightDataRescalingDescriptorUtil.mappings((int[][]) new int[0]), InflightDataRescalingDescriptorUtil.mappings((int[][]) new int[]{InflightDataRescalingDescriptorUtil.to(new int[]{2, 3}), InflightDataRescalingDescriptorUtil.to(new int[]{4, 5})})}), Collections.emptySet()), num -> {
            return new SpillingAdaptiveSpanningRecordDeserializer(this.ioManager.getSpillingDirectoriesPaths());
        }, inputChannelInfo -> {
            return RecordFilter.all();
        });
        Assertions.assertThat(create.getVirtualChannelSelectors()).containsOnly(new SubtaskConnectionDescriptor[]{new SubtaskConnectionDescriptor(0, 2), new SubtaskConnectionDescriptor(0, 3), new SubtaskConnectionDescriptor(1, 2), new SubtaskConnectionDescriptor(1, 3)});
        for (int i = 0; i < 100; i++) {
            SubtaskConnectionDescriptor subtaskConnectionDescriptor = (SubtaskConnectionDescriptor) Iterables.get(create.getVirtualChannelSelectors(), this.random.nextInt(4));
            long inputSubtaskIndex = (subtaskConnectionDescriptor.getInputSubtaskIndex() << 4) | subtaskConnectionDescriptor.getOutputSubtaskIndex();
            MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(128);
            BufferBuilder createBufferBuilder = BufferBuilderTestUtils.createBufferBuilder(allocateUnpooledSegment);
            Throwable th = null;
            try {
                try {
                    Buffer writeLongs = writeLongs(createBufferBuilder, inputSubtaskIndex + 1, inputSubtaskIndex + 2, inputSubtaskIndex + 3);
                    create.select(subtaskConnectionDescriptor);
                    create.setNextBuffer(writeLongs);
                    if (createBufferBuilder != null) {
                        if (0 != 0) {
                            try {
                                createBufferBuilder.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createBufferBuilder.close();
                        }
                    }
                    Assertions.assertThat(readLongs(create)).containsExactly(new Long[]{Long.valueOf(inputSubtaskIndex + 1), Long.valueOf(inputSubtaskIndex + 2), Long.valueOf(inputSubtaskIndex + 3)});
                    Assertions.assertThat(allocateUnpooledSegment.isFreed()).isTrue();
                } finally {
                }
            } catch (Throwable th3) {
                if (createBufferBuilder != null) {
                    if (th != null) {
                        try {
                            createBufferBuilder.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        createBufferBuilder.close();
                    }
                }
                throw th3;
            }
        }
    }

    /* JADX WARN: Type inference failed for: r5v1, types: [int[], int[][]] */
    /* JADX WARN: Type inference failed for: r5v4, types: [int[], int[][]] */
    @Test
    void testAmbiguousChannels() throws IOException {
        DemultiplexingRecordDeserializer<Long> create = DemultiplexingRecordDeserializer.create(new InputChannelInfo(1, 0), InflightDataRescalingDescriptorUtil.rescalingDescriptor(InflightDataRescalingDescriptorUtil.to(new int[]{41, 42}), (RescaleMappings[]) InflightDataRescalingDescriptorUtil.array(new RescaleMappings[]{InflightDataRescalingDescriptorUtil.mappings((int[][]) new int[0]), InflightDataRescalingDescriptorUtil.mappings((int[][]) new int[]{InflightDataRescalingDescriptorUtil.to(new int[]{2, 3}), InflightDataRescalingDescriptorUtil.to(new int[]{4, 5})})}), InflightDataRescalingDescriptorUtil.set(new Integer[]{42})), num -> {
            return new SpillingAdaptiveSpanningRecordDeserializer(this.ioManager.getSpillingDirectoriesPaths());
        }, inputChannelInfo -> {
            return new RecordFilter(new ModSelector(2), LongSerializer.INSTANCE, 1);
        });
        Assertions.assertThat(create.getVirtualChannelSelectors()).containsOnly(new SubtaskConnectionDescriptor[]{new SubtaskConnectionDescriptor(41, 2), new SubtaskConnectionDescriptor(41, 3), new SubtaskConnectionDescriptor(42, 2), new SubtaskConnectionDescriptor(42, 3)});
        for (int i = 0; i < 100; i++) {
            MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(128);
            BufferBuilder createBufferBuilder = BufferBuilderTestUtils.createBufferBuilder(allocateUnpooledSegment);
            Throwable th = null;
            try {
                try {
                    Buffer writeLongs = writeLongs(createBufferBuilder, i, i + 1);
                    SubtaskConnectionDescriptor subtaskConnectionDescriptor = (SubtaskConnectionDescriptor) Iterables.get(create.getVirtualChannelSelectors(), (i / 10) % 2);
                    create.select(subtaskConnectionDescriptor);
                    create.setNextBuffer(writeLongs);
                    if (subtaskConnectionDescriptor.getInputSubtaskIndex() == 41) {
                        Assertions.assertThat(readLongs(create)).containsExactly(new Long[]{Long.valueOf(i), Long.valueOf(i + 1)});
                    } else {
                        Assertions.assertThat(readLongs(create)).containsExactly(new Long[]{Long.valueOf(((i / 2) * 2) + 1)});
                    }
                    if (createBufferBuilder != null) {
                        if (0 != 0) {
                            try {
                                createBufferBuilder.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createBufferBuilder.close();
                        }
                    }
                    Assertions.assertThat(allocateUnpooledSegment.isFreed()).isTrue();
                } finally {
                }
            } catch (Throwable th3) {
                if (createBufferBuilder != null) {
                    if (th != null) {
                        try {
                            createBufferBuilder.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        createBufferBuilder.close();
                    }
                }
                throw th3;
            }
        }
    }

    /* JADX WARN: Type inference failed for: r5v1, types: [int[], int[][]] */
    @Test
    void testWatermarks() throws IOException {
        DemultiplexingRecordDeserializer<Long> create = DemultiplexingRecordDeserializer.create(new InputChannelInfo(0, 0), InflightDataRescalingDescriptorUtil.rescalingDescriptor(InflightDataRescalingDescriptorUtil.to(new int[]{0, 1}), (RescaleMappings[]) InflightDataRescalingDescriptorUtil.array(new RescaleMappings[]{InflightDataRescalingDescriptorUtil.mappings((int[][]) new int[]{InflightDataRescalingDescriptorUtil.to(new int[]{0, 1}), InflightDataRescalingDescriptorUtil.to(new int[]{4, 5})})}), Collections.emptySet()), num -> {
            return new SpillingAdaptiveSpanningRecordDeserializer(this.ioManager.getSpillingDirectoriesPaths());
        }, inputChannelInfo -> {
            return RecordFilter.all();
        });
        Assertions.assertThat(create.getVirtualChannelSelectors()).hasSize(4);
        Iterator it = create.getVirtualChannelSelectors().iterator();
        while (it.hasNext()) {
            SubtaskConnectionDescriptor subtaskConnectionDescriptor = (SubtaskConnectionDescriptor) it.next();
            MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(128);
            BufferBuilder createBufferBuilder = BufferBuilderTestUtils.createBufferBuilder(allocateUnpooledSegment);
            Throwable th = null;
            try {
                try {
                    Buffer write = write(createBufferBuilder, new Watermark(42 + subtaskConnectionDescriptor.getInputSubtaskIndex() + subtaskConnectionDescriptor.getOutputSubtaskIndex()));
                    create.select(subtaskConnectionDescriptor);
                    create.setNextBuffer(write);
                    if (createBufferBuilder != null) {
                        if (0 != 0) {
                            try {
                                createBufferBuilder.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createBufferBuilder.close();
                        }
                    }
                    if (it.hasNext()) {
                        Assertions.assertThat(read(create)).isEmpty();
                    } else {
                        Assertions.assertThat(read(create)).containsExactly(new StreamElement[]{new Watermark(42L)});
                    }
                    Assertions.assertThat(allocateUnpooledSegment.isFreed()).isTrue();
                } catch (Throwable th3) {
                    if (createBufferBuilder != null) {
                        if (th != null) {
                            try {
                                createBufferBuilder.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            createBufferBuilder.close();
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        }
    }

    private Buffer writeLongs(BufferBuilder bufferBuilder, long... jArr) throws IOException {
        return write(bufferBuilder, (StreamElement[]) Arrays.stream(jArr).mapToObj((v1) -> {
            return new StreamRecord(v1);
        }).toArray(i -> {
            return new StreamElement[i];
        }));
    }

    private Buffer write(BufferBuilder bufferBuilder, StreamElement... streamElementArr) throws IOException {
        BufferConsumer createBufferConsumer = bufferBuilder.createBufferConsumer();
        Throwable th = null;
        try {
            try {
                DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128);
                SerializationDelegate serializationDelegate = new SerializationDelegate(new StreamElementSerializer(LongSerializer.INSTANCE));
                for (StreamElement streamElement : streamElementArr) {
                    serializationDelegate.setInstance(streamElement);
                    bufferBuilder.appendAndCommit(RecordWriter.serializeRecord(dataOutputSerializer, serializationDelegate));
                }
                Buffer build = createBufferConsumer.build();
                if (createBufferConsumer != null) {
                    if (0 != 0) {
                        try {
                            createBufferConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createBufferConsumer.close();
                    }
                }
                return build;
            } finally {
            }
        } catch (Throwable th3) {
            if (createBufferConsumer != null) {
                if (th != null) {
                    try {
                        createBufferConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createBufferConsumer.close();
                }
            }
            throw th3;
        }
    }

    private List<StreamElement> read(DemultiplexingRecordDeserializer<Long> demultiplexingRecordDeserializer) throws IOException {
        RecordDeserializer.DeserializationResult nextRecord;
        NonReusingDeserializationDelegate nonReusingDeserializationDelegate = new NonReusingDeserializationDelegate(new StreamElementSerializer(LongSerializer.INSTANCE));
        ArrayList arrayList = new ArrayList();
        do {
            nextRecord = demultiplexingRecordDeserializer.getNextRecord(nonReusingDeserializationDelegate);
            if (nextRecord.isFullRecord()) {
                arrayList.add(nonReusingDeserializationDelegate.getInstance());
            }
        } while (!nextRecord.isBufferConsumed());
        return arrayList;
    }

    private List<Long> readLongs(DemultiplexingRecordDeserializer<Long> demultiplexingRecordDeserializer) throws IOException {
        return (List) read(demultiplexingRecordDeserializer).stream().map(streamElement -> {
            return (Long) streamElement.asRecord().getValue();
        }).collect(Collectors.toList());
    }
}
