/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.recovery;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.SubtaskConnectionDescriptor;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.shaded.guava32.com.google.common.collect.Maps;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.recovery.RecordFilter;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;

class DemultiplexingRecordDeserializer<T>
implements RecordDeserializer<DeserializationDelegate<StreamElement>> {
    public static final DemultiplexingRecordDeserializer UNMAPPED = new DemultiplexingRecordDeserializer(Collections.emptyMap());
    private final Map<SubtaskConnectionDescriptor, VirtualChannel<T>> channels;
    private VirtualChannel<T> currentVirtualChannel;

    public DemultiplexingRecordDeserializer(Map<SubtaskConnectionDescriptor, VirtualChannel<T>> channels) {
        this.channels = Preconditions.checkNotNull(channels);
    }

    public void select(SubtaskConnectionDescriptor descriptor) {
        this.currentVirtualChannel = this.channels.get(descriptor);
        if (this.currentVirtualChannel == null) {
            throw new IllegalStateException("Cannot select " + descriptor + "; known channels are " + this.channels.keySet());
        }
    }

    public boolean hasMappings() {
        return !this.channels.isEmpty();
    }

    @VisibleForTesting
    Collection<SubtaskConnectionDescriptor> getVirtualChannelSelectors() {
        return this.channels.keySet();
    }

    @Override
    public void setNextBuffer(Buffer buffer) throws IOException {
        this.currentVirtualChannel.setNextBuffer(buffer);
    }

    @Override
    public CloseableIterator<Buffer> getUnconsumedBuffer() throws IOException {
        throw new IllegalStateException("Cannot checkpoint while recovering");
    }

    public boolean hasPartialData() {
        return this.channels.values().stream().anyMatch(VirtualChannel::hasPartialData);
    }

    @Override
    public RecordDeserializer.DeserializationResult getNextRecord(DeserializationDelegate<StreamElement> delegate) throws IOException {
        RecordDeserializer.DeserializationResult result;
        do {
            if (!(result = this.currentVirtualChannel.getNextRecord(delegate)).isFullRecord()) continue;
            StreamElement element = delegate.getInstance();
            if (element.isRecord() || element.isLatencyMarker()) {
                return result;
            }
            if (element.isWatermark()) {
                Watermark minWatermark = this.channels.values().stream().map(virtualChannel -> virtualChannel.lastWatermark).min(Comparator.comparing(Watermark::getTimestamp)).orElseThrow(() -> new IllegalStateException("Should always have a watermark"));
                if (minWatermark.equals(Watermark.UNINITIALIZED)) continue;
                delegate.setInstance(minWatermark);
                return result;
            }
            if (!element.isWatermarkStatus()) continue;
            if (this.channels.values().stream().anyMatch(d -> d.watermarkStatus.isActive())) {
                delegate.setInstance(WatermarkStatus.ACTIVE);
            }
            return result;
        } while (!result.isBufferConsumed());
        return RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
    }

    @Override
    public void clear() {
        this.channels.values().forEach(d -> d.clear());
    }

    static <T> DemultiplexingRecordDeserializer<T> create(InputChannelInfo channelInfo, InflightDataRescalingDescriptor rescalingDescriptor, Function<Integer, RecordDeserializer<DeserializationDelegate<StreamElement>>> deserializerFactory, Function<InputChannelInfo, Predicate<StreamRecord<T>>> recordFilterFactory) {
        int[] oldSubtaskIndexes = rescalingDescriptor.getOldSubtaskIndexes(channelInfo.getGateIdx());
        if (oldSubtaskIndexes.length == 0) {
            return UNMAPPED;
        }
        int[] oldChannelIndexes = rescalingDescriptor.getChannelMapping(channelInfo.getGateIdx()).getMappedIndexes(channelInfo.getInputChannelIdx());
        if (oldChannelIndexes.length == 0) {
            return UNMAPPED;
        }
        int totalChannels = oldSubtaskIndexes.length * oldChannelIndexes.length;
        HashMap<SubtaskConnectionDescriptor, VirtualChannel<T>> virtualChannels = Maps.newHashMapWithExpectedSize(totalChannels);
        for (int subtask : oldSubtaskIndexes) {
            for (int channel : oldChannelIndexes) {
                SubtaskConnectionDescriptor descriptor = new SubtaskConnectionDescriptor(subtask, channel);
                virtualChannels.put(descriptor, new VirtualChannel(deserializerFactory.apply(totalChannels), rescalingDescriptor.isAmbiguous(channelInfo.getGateIdx(), subtask) ? recordFilterFactory.apply(channelInfo) : RecordFilter.all()));
            }
        }
        return new DemultiplexingRecordDeserializer<T>(virtualChannels);
    }

    public String toString() {
        return "DemultiplexingRecordDeserializer{channels=" + this.channels.keySet() + "}";
    }

    static class VirtualChannel<T> {
        private final RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer;
        private final Predicate<StreamRecord<T>> recordFilter;
        Watermark lastWatermark = Watermark.UNINITIALIZED;
        WatermarkStatus watermarkStatus = WatermarkStatus.ACTIVE;
        private RecordDeserializer.DeserializationResult lastResult;

        VirtualChannel(RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer, Predicate<StreamRecord<T>> recordFilter) {
            this.deserializer = deserializer;
            this.recordFilter = recordFilter;
        }

        public RecordDeserializer.DeserializationResult getNextRecord(DeserializationDelegate<StreamElement> delegate) throws IOException {
            do {
                this.lastResult = this.deserializer.getNextRecord(delegate);
                if (!this.lastResult.isFullRecord()) continue;
                StreamElement element = delegate.getInstance();
                if (element.isRecord() && this.recordFilter.test(element.asRecord())) {
                    return this.lastResult;
                }
                if (element.isWatermark()) {
                    this.lastWatermark = element.asWatermark();
                    return this.lastResult;
                }
                if (!element.isWatermarkStatus()) continue;
                this.watermarkStatus = element.asWatermarkStatus();
                return this.lastResult;
            } while (!this.lastResult.isBufferConsumed());
            return RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
        }

        public void setNextBuffer(Buffer buffer) throws IOException {
            this.deserializer.setNextBuffer(buffer);
        }

        public void clear() {
            this.deserializer.clear();
        }

        public boolean hasPartialData() {
            return this.lastResult != null && !this.lastResult.isBufferConsumed();
        }
    }
}

