/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sort;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

final class CollectionDataInput<E>
implements StreamTaskInput<E> {
    private final Iterator<StreamElement> elementsIterator;
    private final int inputIdx;
    private boolean endOfInput = false;

    CollectionDataInput(Collection<StreamElement> elements) {
        this(elements, 0);
    }

    CollectionDataInput(Collection<StreamElement> elements, int inputIdx) {
        this.elementsIterator = elements.iterator();
        this.inputIdx = inputIdx;
    }

    public DataInputStatus emitNext(PushingAsyncDataInput.DataOutput<E> output) throws Exception {
        if (this.elementsIterator.hasNext()) {
            StreamElement streamElement = this.elementsIterator.next();
            if (streamElement instanceof StreamRecord) {
                output.emitRecord(streamElement.asRecord());
            } else if (streamElement instanceof Watermark) {
                output.emitWatermark(streamElement.asWatermark());
            } else if (streamElement.isRecordAttributes()) {
                output.emitRecordAttributes(streamElement.asRecordAttributes());
            } else if (streamElement.isWatermarkStatus()) {
                output.emitWatermarkStatus(streamElement.asWatermarkStatus());
            } else {
                throw new IllegalStateException("Unsupported element type: " + String.valueOf(streamElement));
            }
        }
        if (this.elementsIterator.hasNext()) {
            return DataInputStatus.MORE_AVAILABLE;
        }
        if (this.endOfInput) {
            return DataInputStatus.END_OF_INPUT;
        }
        this.endOfInput = true;
        return DataInputStatus.END_OF_DATA;
    }

    public CompletableFuture<?> getAvailableFuture() {
        return CompletableFuture.completedFuture(null);
    }

    public int getInputIndex() {
        return this.inputIdx;
    }

    public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException {
        return null;
    }

    public void close() throws IOException {
    }
}

