/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.util.Preconditions;

@Internal
public class StreamTaskSourceInput<T>
implements StreamTaskInput<T>,
CheckpointableInput {
    private final SourceOperator<T, ?> operator;
    private final int inputGateIndex;
    private final AvailabilityProvider.AvailabilityHelper isBlockedAvailability = new AvailabilityProvider.AvailabilityHelper();
    private final List<InputChannelInfo> inputChannelInfos;
    private final int inputIndex;

    public StreamTaskSourceInput(SourceOperator<T, ?> operator, int inputGateIndex, int inputIndex) {
        this.operator = (SourceOperator)Preconditions.checkNotNull(operator);
        this.inputGateIndex = inputGateIndex;
        this.inputChannelInfos = Collections.singletonList(new InputChannelInfo(inputGateIndex, 0));
        this.isBlockedAvailability.resetAvailable();
        this.inputIndex = inputIndex;
    }

    @Override
    public DataInputStatus emitNext(PushingAsyncDataInput.DataOutput<T> output) throws Exception {
        if (this.isBlockedAvailability.isApproximatelyAvailable()) {
            return this.operator.emitNext(output);
        }
        return DataInputStatus.NOTHING_AVAILABLE;
    }

    @Override
    public CompletableFuture<?> getAvailableFuture() {
        return this.isBlockedAvailability.and(this.operator);
    }

    @Override
    public void blockConsumption(InputChannelInfo channelInfo) {
        this.isBlockedAvailability.resetUnavailable();
    }

    @Override
    public void resumeConsumption(InputChannelInfo channelInfo) {
        this.isBlockedAvailability.getUnavailableToResetAvailable().complete(null);
    }

    @Override
    public List<InputChannelInfo> getChannelInfos() {
        return this.inputChannelInfos;
    }

    @Override
    public int getNumberOfInputChannels() {
        return this.inputChannelInfos.size();
    }

    @Override
    public void checkpointStarted(CheckpointBarrier barrier) {
        this.blockConsumption(null);
    }

    @Override
    public void checkpointStopped(long cancelledCheckpointId) {
        this.resumeConsumption(null);
    }

    @Override
    public int getInputGateIndex() {
        return this.inputGateIndex;
    }

    @Override
    public void convertToPriorityEvent(int channelIndex, int sequenceNumber) throws IOException {
    }

    @Override
    public int getInputIndex() {
        return this.inputIndex;
    }

    @Override
    public void close() {
    }

    @Override
    public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException {
        return CompletableFuture.completedFuture(null);
    }

    public OperatorID getOperatorID() {
        return this.operator.getOperatorID();
    }

    public SourceOperator<T, ?> getOperator() {
        return this.operator;
    }
}

