/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.netty.ClientOutboundMessage;
import org.apache.flink.runtime.io.network.netty.ConnectionErrorMessage;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.netty.exception.TransportException;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CreditBasedPartitionRequestClientHandler
extends ChannelInboundHandlerAdapter
implements NetworkClientHandler {
    private static final Logger LOG = LoggerFactory.getLogger(CreditBasedPartitionRequestClientHandler.class);
    private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels = new ConcurrentHashMap<InputChannelID, RemoteInputChannel>();
    private final ArrayDeque<ClientOutboundMessage> clientOutboundMessages = new ArrayDeque();
    private final AtomicReference<Throwable> channelError = new AtomicReference();
    private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener();
    private volatile ChannelHandlerContext ctx;
    private ConnectionID connectionID;

    CreditBasedPartitionRequestClientHandler() {
    }

    @Override
    public void addInputChannel(RemoteInputChannel listener) throws IOException {
        this.checkError();
        this.inputChannels.putIfAbsent(listener.getInputChannelId(), listener);
    }

    @Override
    public void removeInputChannel(RemoteInputChannel listener) {
        this.inputChannels.remove(listener.getInputChannelId());
    }

    @Override
    public RemoteInputChannel getInputChannel(InputChannelID inputChannelId) {
        return (RemoteInputChannel)this.inputChannels.get(inputChannelId);
    }

    @Override
    public void cancelRequestFor(InputChannelID inputChannelId) {
        if (inputChannelId == null || this.ctx == null) {
            return;
        }
        this.ctx.writeAndFlush(new NettyMessage.CancelPartitionRequest(inputChannelId));
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (this.ctx == null) {
            this.ctx = ctx;
        }
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        SocketAddress remoteAddr = ctx.channel().remoteAddress();
        this.notifyAllChannelsOfErrorAndClose(new RemoteTransportException("Connection unexpectedly closed by remote task manager '" + remoteAddr + " [ " + this.connectionID.getResourceID().getStringWithMetadata() + " ] '. This might indicate that the remote task manager was lost.", remoteAddr));
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof TransportException) {
            this.notifyAllChannelsOfErrorAndClose(cause);
        } else {
            TransportException tex;
            SocketAddress remoteAddr = ctx.channel().remoteAddress();
            if (cause.getMessage() != null && cause.getMessage().contains("Connection reset by peer")) {
                tex = new RemoteTransportException("Lost connection to task manager '" + remoteAddr + " [ " + this.connectionID.getResourceID().getStringWithMetadata() + " ] '. This indicates that the remote task manager was lost.", remoteAddr, cause);
            } else {
                SocketAddress localAddr = ctx.channel().localAddress();
                tex = new LocalTransportException(String.format("%s (connection to '%s [%s]')", cause.getMessage(), remoteAddr, this.connectionID.getResourceID().getStringWithMetadata()), localAddr, cause);
            }
            this.notifyAllChannelsOfErrorAndClose(tex);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            this.decodeMsg(msg);
        }
        catch (Throwable t) {
            this.notifyAllChannelsOfErrorAndClose(t);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ClientOutboundMessage) {
            boolean triggerWrite = this.clientOutboundMessages.isEmpty();
            this.clientOutboundMessages.add((ClientOutboundMessage)msg);
            if (triggerWrite) {
                this.writeAndFlushNextMessageIfPossible(ctx.channel());
            }
        } else if (msg instanceof ConnectionErrorMessage) {
            this.notifyAllChannelsOfErrorAndClose(((ConnectionErrorMessage)msg).getCause());
        } else {
            ctx.fireUserEventTriggered(msg);
        }
    }

    @Override
    public boolean hasChannelError() {
        return this.channelError.get() != null;
    }

    @Override
    public void setConnectionId(ConnectionID connectionId) {
        this.connectionID = Preconditions.checkNotNull(connectionId);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        this.writeAndFlushNextMessageIfPossible(ctx.channel());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void notifyAllChannelsOfErrorAndClose(Throwable cause) {
        if (this.channelError.compareAndSet(null, cause)) {
            try {
                for (RemoteInputChannel inputChannel : this.inputChannels.values()) {
                    inputChannel.onError(cause);
                }
            }
            catch (Throwable t) {
                LOG.warn("An Exception was thrown during error notification of a remote input channel.", t);
            }
            finally {
                this.inputChannels.clear();
                this.clientOutboundMessages.clear();
                if (this.ctx != null) {
                    this.ctx.close();
                }
            }
        }
    }

    @VisibleForTesting
    void checkError() throws IOException {
        Throwable t = this.channelError.get();
        if (t != null) {
            if (t instanceof IOException) {
                throw (IOException)t;
            }
            throw new IOException("There has been an error in the channel.", t);
        }
    }

    private void decodeMsg(Object msg) {
        Class<?> msgClazz = msg.getClass();
        if (msgClazz == NettyMessage.BufferResponse.class) {
            NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse)msg;
            RemoteInputChannel inputChannel = (RemoteInputChannel)this.inputChannels.get(bufferOrEvent.receiverId);
            if (inputChannel == null || inputChannel.isReleased()) {
                bufferOrEvent.releaseBuffer();
                this.cancelRequestFor(bufferOrEvent.receiverId);
                return;
            }
            try {
                this.decodeBufferOrEvent(inputChannel, bufferOrEvent);
            }
            catch (Throwable t) {
                inputChannel.onError(t);
            }
        } else if (msgClazz == NettyMessage.ErrorResponse.class) {
            NettyMessage.ErrorResponse error = (NettyMessage.ErrorResponse)msg;
            SocketAddress remoteAddr = this.ctx.channel().remoteAddress();
            if (error.isFatalError()) {
                this.notifyAllChannelsOfErrorAndClose(new RemoteTransportException("Fatal error at remote task manager '" + remoteAddr + " [ " + this.connectionID.getResourceID().getStringWithMetadata() + " ] '.", remoteAddr, error.cause));
            } else {
                RemoteInputChannel inputChannel = (RemoteInputChannel)this.inputChannels.get(error.receiverId);
                if (inputChannel != null) {
                    if (error.cause.getClass() == PartitionNotFoundException.class) {
                        inputChannel.onFailedPartitionRequest();
                    } else {
                        inputChannel.onError(new RemoteTransportException("Error at remote task manager '" + remoteAddr + " [ " + this.connectionID.getResourceID().getStringWithMetadata() + " ] '.", remoteAddr, error.cause));
                    }
                }
            }
        } else if (msgClazz == NettyMessage.BacklogAnnouncement.class) {
            NettyMessage.BacklogAnnouncement announcement = (NettyMessage.BacklogAnnouncement)msg;
            RemoteInputChannel inputChannel = (RemoteInputChannel)this.inputChannels.get(announcement.receiverId);
            if (inputChannel == null || inputChannel.isReleased()) {
                this.cancelRequestFor(announcement.receiverId);
                return;
            }
            try {
                inputChannel.onSenderBacklog(announcement.backlog);
            }
            catch (Throwable throwable) {
                inputChannel.onError(throwable);
            }
        } else {
            throw new IllegalStateException("Received unknown message from producer: " + msg.getClass());
        }
    }

    private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable {
        if (bufferOrEvent.isBuffer() && bufferOrEvent.bufferSize == 0) {
            inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
        } else if (bufferOrEvent.getBuffer() != null) {
            if (bufferOrEvent.numOfPartialBuffers > 0) {
                int offset = 0;
                int seq = bufferOrEvent.sequenceNumber;
                AtomicInteger waitToBeReleased = new AtomicInteger(bufferOrEvent.numOfPartialBuffers);
                AtomicInteger processedPartialBuffers = new AtomicInteger(0);
                try {
                    for (int i = 0; i < bufferOrEvent.numOfPartialBuffers; ++i) {
                        int size = bufferOrEvent.getPartialBufferSizes().get(i);
                        processedPartialBuffers.incrementAndGet();
                        inputChannel.onBuffer(CreditBasedPartitionRequestClientHandler.sliceBuffer(bufferOrEvent, memorySegment -> {
                            if (waitToBeReleased.decrementAndGet() == 0) {
                                bufferOrEvent.getBuffer().recycleBuffer();
                            }
                        }, offset, size), seq++, i == bufferOrEvent.numOfPartialBuffers - 1 ? bufferOrEvent.backlog : -1, -1);
                        offset += size;
                    }
                }
                catch (Throwable throwable) {
                    LOG.error("Failed to process partial buffers.", throwable);
                    if (processedPartialBuffers.get() != bufferOrEvent.numOfPartialBuffers) {
                        bufferOrEvent.getBuffer().recycleBuffer();
                    }
                    throw throwable;
                }
            } else {
                inputChannel.onBuffer(bufferOrEvent.getBuffer(), bufferOrEvent.sequenceNumber, bufferOrEvent.backlog, bufferOrEvent.subpartitionId);
            }
        } else {
            throw new IllegalStateException("The read buffer is null in credit-based input channel.");
        }
    }

    private static NetworkBuffer sliceBuffer(NettyMessage.BufferResponse bufferOrEvent, BufferRecycler recycler, int offset, int size) {
        MemorySegment segment;
        ByteBuffer nioBuffer = bufferOrEvent.getBuffer().getNioBuffer(offset, size);
        if (nioBuffer.isDirect()) {
            segment = MemorySegmentFactory.wrapOffHeapMemory(nioBuffer);
        } else {
            byte[] bytes = nioBuffer.array();
            segment = MemorySegmentFactory.wrap(bytes);
        }
        return new NetworkBuffer(segment, recycler, bufferOrEvent.dataType, bufferOrEvent.isCompressed, size);
    }

    private void writeAndFlushNextMessageIfPossible(Channel channel) {
        Object msg;
        ClientOutboundMessage outboundMessage;
        if (this.channelError.get() != null || !channel.isWritable()) {
            return;
        }
        do {
            if ((outboundMessage = this.clientOutboundMessages.poll()) != null) continue;
            return;
        } while (outboundMessage.inputChannel.isReleased() || (msg = outboundMessage.buildMessage()) == null);
        channel.writeAndFlush(msg).addListener(this.writeListener);
    }

    private class WriteAndFlushNextMessageIfPossibleListener
    implements ChannelFutureListener {
        private WriteAndFlushNextMessageIfPossibleListener() {
        }

        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            try {
                if (future.isSuccess()) {
                    CreditBasedPartitionRequestClientHandler.this.writeAndFlushNextMessageIfPossible(future.channel());
                } else if (future.cause() != null) {
                    CreditBasedPartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(future.cause());
                } else {
                    CreditBasedPartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(new IllegalStateException("Sending cancelled by user."));
                }
            }
            catch (Throwable t) {
                CreditBasedPartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(t);
            }
        }
    }
}

