/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
import org.apache.flink.runtime.io.network.buffer.FullyFilledBuffer;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PartitionRequestListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PartitionRequestQueue
extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestQueue.class);
    private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener();
    private final ArrayDeque<NetworkSequenceViewReader> availableReaders = new ArrayDeque();
    private final ConcurrentMap<InputChannelID, NetworkSequenceViewReader> allReaders = new ConcurrentHashMap<InputChannelID, NetworkSequenceViewReader>();
    private boolean fatalError;
    private ChannelHandlerContext ctx;

    PartitionRequestQueue() {
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        if (this.ctx == null) {
            this.ctx = ctx;
        }
        super.channelRegistered(ctx);
    }

    void notifyReaderNonEmpty(NetworkSequenceViewReader reader) {
        this.ctx.executor().execute(() -> this.ctx.pipeline().fireUserEventTriggered((Object)reader));
    }

    private void enqueueAvailableReader(NetworkSequenceViewReader reader) throws Exception {
        if (reader.isRegisteredAsAvailable()) {
            return;
        }
        ResultSubpartitionView.AvailabilityWithBacklog availabilityWithBacklog = reader.getAvailabilityAndBacklog();
        if (!availabilityWithBacklog.isAvailable()) {
            int backlog = availabilityWithBacklog.getBacklog();
            if (backlog > 0 && reader.needAnnounceBacklog()) {
                this.announceBacklog(reader, backlog);
            }
            return;
        }
        boolean triggerWrite = this.availableReaders.isEmpty();
        this.registerAvailableReader(reader);
        if (triggerWrite) {
            this.writeAndFlushNextMessageIfPossible(this.ctx.channel());
        }
    }

    @VisibleForTesting
    ArrayDeque<NetworkSequenceViewReader> getAvailableReaders() {
        return this.availableReaders;
    }

    public void notifyReaderCreated(NetworkSequenceViewReader reader) {
        this.allReaders.put(reader.getReceiverId(), reader);
    }

    public void cancel(InputChannelID receiverId) {
        this.ctx.pipeline().fireUserEventTriggered((Object)receiverId);
    }

    public void close() throws IOException {
        if (this.ctx != null) {
            this.ctx.channel().close();
        }
        this.releaseAllResources();
    }

    void addCreditOrResumeConsumption(InputChannelID receiverId, Consumer<NetworkSequenceViewReader> operation) throws Exception {
        if (this.fatalError) {
            return;
        }
        NetworkSequenceViewReader reader = this.obtainReader(receiverId);
        operation.accept(reader);
        this.enqueueAvailableReader(reader);
    }

    void acknowledgeAllRecordsProcessed(InputChannelID receiverId) {
        if (this.fatalError) {
            return;
        }
        this.obtainReader(receiverId).acknowledgeAllRecordsProcessed();
    }

    void notifyNewBufferSize(InputChannelID receiverId, int newBufferSize) {
        if (this.fatalError) {
            return;
        }
        NetworkSequenceViewReader reader = (NetworkSequenceViewReader)this.allReaders.get((Object)receiverId);
        if (reader != null) {
            reader.notifyNewBufferSize(newBufferSize);
        }
    }

    void notifyRequiredSegmentId(InputChannelID receiverId, int subpartitionId, int segmentId) {
        if (this.fatalError) {
            return;
        }
        NetworkSequenceViewReader reader = (NetworkSequenceViewReader)this.allReaders.get((Object)receiverId);
        if (reader != null) {
            reader.notifyRequiredSegmentId(subpartitionId, segmentId);
        }
    }

    NetworkSequenceViewReader obtainReader(InputChannelID receiverId) {
        NetworkSequenceViewReader reader = (NetworkSequenceViewReader)this.allReaders.get((Object)receiverId);
        if (reader == null) {
            throw new IllegalStateException("No reader for receiverId = " + receiverId + " exists.");
        }
        return reader;
    }

    private void announceBacklog(NetworkSequenceViewReader reader, int backlog) {
        Preconditions.checkArgument((backlog > 0 ? 1 : 0) != 0, (Object)"Backlog must be positive.");
        NettyMessage.BacklogAnnouncement announcement = new NettyMessage.BacklogAnnouncement(backlog, reader.getReceiverId());
        this.ctx.channel().writeAndFlush((Object)announcement).addListener((GenericFutureListener)((ChannelFutureListener)future -> {
            if (!future.isSuccess()) {
                this.onChannelFutureFailure((ChannelFuture)future);
            }
        }));
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof NetworkSequenceViewReader) {
            this.enqueueAvailableReader((NetworkSequenceViewReader)msg);
        } else if (msg.getClass() == InputChannelID.class) {
            InputChannelID toCancel = (InputChannelID)((Object)msg);
            this.availableReaders.removeIf(reader -> reader.getReceiverId().equals((Object)toCancel));
            NetworkSequenceViewReader toRelease = (NetworkSequenceViewReader)this.allReaders.remove((Object)toCancel);
            if (toRelease != null) {
                this.releaseViewReader(toRelease);
            }
        } else if (msg instanceof PartitionRequestListener) {
            PartitionRequestListener partitionRequestListener = (PartitionRequestListener)msg;
            ResultPartitionID resultPartitionId = partitionRequestListener.getResultPartitionId();
            InputChannelID inputChannelId = partitionRequestListener.getReceiverId();
            this.availableReaders.remove(partitionRequestListener.getViewReader());
            this.allReaders.remove((Object)inputChannelId);
            try {
                ctx.writeAndFlush((Object)new NettyMessage.ErrorResponse(new PartitionNotFoundException(resultPartitionId), inputChannelId));
            }
            catch (Exception e) {
                LOG.warn("Write partition not found exception to {} for result partition {} fail", new Object[]{inputChannelId, resultPartitionId, e});
            }
        } else {
            ctx.fireUserEventTriggered(msg);
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        this.writeAndFlushNextMessageIfPossible(ctx.channel());
    }

    private void writeAndFlushNextMessageIfPossible(Channel channel) throws IOException {
        if (this.fatalError || !channel.isWritable()) {
            return;
        }
        InputChannel.BufferAndAvailability next = null;
        int nextSubpartitionId = -1;
        try {
            NetworkSequenceViewReader reader;
            while (true) {
                Throwable cause;
                if ((reader = this.pollAvailableReader()) == null) {
                    return;
                }
                nextSubpartitionId = reader.peekNextBufferSubpartitionId();
                next = reader.getNextBuffer();
                if (next != null) break;
                if (!reader.isReleased() || (cause = reader.getFailureCause()) == null) continue;
                NettyMessage.ErrorResponse msg = new NettyMessage.ErrorResponse(cause, reader.getReceiverId());
                this.ctx.writeAndFlush((Object)msg);
            }
            if (next.moreAvailable()) {
                this.registerAvailableReader(reader);
            }
            NettyMessage.BufferResponse msg = new NettyMessage.BufferResponse(next.buffer(), next.getSequenceNumber(), reader.getReceiverId(), nextSubpartitionId, next.buffer() instanceof FullyFilledBuffer ? ((FullyFilledBuffer)next.buffer()).getPartialBuffers().size() : 0, next.buffersInBacklog());
            channel.writeAndFlush((Object)msg).addListener((GenericFutureListener)this.writeListener);
            return;
        }
        catch (Throwable t) {
            if (next != null) {
                next.buffer().recycleBuffer();
            }
            throw new IOException(t.getMessage(), t);
        }
    }

    private void registerAvailableReader(NetworkSequenceViewReader reader) {
        this.availableReaders.add(reader);
        reader.setRegisteredAsAvailable(true);
    }

    @Nullable
    private NetworkSequenceViewReader pollAvailableReader() {
        NetworkSequenceViewReader reader = this.availableReaders.poll();
        if (reader != null) {
            reader.setRegisteredAsAvailable(false);
        }
        return reader;
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        this.releaseAllResources();
        ctx.fireChannelInactive();
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        this.handleException(ctx.channel(), cause);
    }

    private void handleException(Channel channel, Throwable cause) throws IOException {
        LOG.error("Encountered error while consuming partitions (connection to {})", (Object)channel.remoteAddress(), (Object)cause);
        this.fatalError = true;
        this.releaseAllResources();
        if (channel.isActive()) {
            channel.writeAndFlush((Object)new NettyMessage.ErrorResponse(cause)).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
        }
    }

    private void releaseAllResources() throws IOException {
        for (NetworkSequenceViewReader reader : this.allReaders.values()) {
            this.releaseViewReader(reader);
        }
        this.availableReaders.clear();
        this.allReaders.clear();
    }

    private void releaseViewReader(NetworkSequenceViewReader reader) throws IOException {
        reader.setRegisteredAsAvailable(false);
        reader.releaseAllResources();
    }

    private void onChannelFutureFailure(ChannelFuture future) throws Exception {
        if (future.cause() != null) {
            this.handleException(future.channel(), future.cause());
        } else {
            this.handleException(future.channel(), new IllegalStateException("Sending cancelled by user."));
        }
    }

    public void notifyPartitionRequestTimeout(PartitionRequestListener partitionRequestListener) {
        this.ctx.pipeline().fireUserEventTriggered((Object)partitionRequestListener);
    }

    private class WriteAndFlushNextMessageIfPossibleListener
    implements ChannelFutureListener {
        private WriteAndFlushNextMessageIfPossibleListener() {
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            try {
                if (future.isSuccess()) {
                    PartitionRequestQueue.this.writeAndFlushNextMessageIfPossible(future.channel());
                } else {
                    PartitionRequestQueue.this.onChannelFutureFailure(future);
                }
            }
            catch (Throwable t) {
                PartitionRequestQueue.this.handleException(future.channel(), t);
            }
        }
    }
}

