/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.broker.processor;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.longpolling.PullRequest;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.broker.plugin.PullMessageResultHandler;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole;

public class DefaultPullMessageResultHandler
implements PullMessageResultHandler {
    protected static final InternalLogger log = InternalLoggerFactory.getLogger((String)"RocketmqBroker");
    protected final BrokerController brokerController;

    public DefaultPullMessageResultHandler(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    @Override
    public RemotingCommand handle(final GetMessageResult getMessageResult, RemotingCommand request, PullMessageRequestHeader requestHeader, final Channel channel, SubscriptionData subscriptionData, SubscriptionGroupConfig subscriptionGroupConfig, boolean brokerAllowSuspend, MessageFilter messageFilter, RemotingCommand response) {
        PullMessageResponseHeader responseHeader = (PullMessageResponseHeader)response.readCustomHeader();
        switch (response.getCode()) {
            case 0: {
                this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), getMessageResult.getMessageCount());
                this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(), getMessageResult.getBufferTotalSize());
                this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
                if (!this.channelIsWritable(channel, requestHeader)) {
                    getMessageResult.release();
                    return null;
                }
                if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                    long beginTimeMills = this.brokerController.getMessageStore().now();
                    byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
                    this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId().intValue(), (int)(this.brokerController.getMessageStore().now() - beginTimeMills));
                    response.setBody(r);
                    return response;
                }
                try {
                    ManyMessageTransfer fileRegion = new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
                    channel.writeAndFlush((Object)fileRegion).addListener((GenericFutureListener)new ChannelFutureListener(){

                        public void operationComplete(ChannelFuture future) throws Exception {
                            getMessageResult.release();
                            if (!future.isSuccess()) {
                                log.error("Fail to transfer messages from page cache to {}", (Object)channel.remoteAddress(), (Object)future.cause());
                            }
                        }
                    });
                }
                catch (Throwable e) {
                    log.error("Error occurred when transferring messages from page cache", e);
                    getMessageResult.release();
                }
                return null;
            }
            case 19: {
                long suspendTimeoutMillisLong;
                boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag((int)requestHeader.getSysFlag());
                long l = suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0L;
                if (brokerAllowSuspend && hasSuspendFlag) {
                    long pollingTimeMills = suspendTimeoutMillisLong;
                    if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                        pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                    }
                    String topic = requestHeader.getTopic();
                    long offset = requestHeader.getQueueOffset();
                    int queueId = requestHeader.getQueueId();
                    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                    this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                    return null;
                }
            }
            case 20: {
                break;
            }
            case 21: {
                if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
                    MessageQueue mq = new MessageQueue();
                    mq.setTopic(requestHeader.getTopic());
                    mq.setQueueId(requestHeader.getQueueId().intValue());
                    mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
                    OffsetMovedEvent event = new OffsetMovedEvent();
                    event.setConsumerGroup(requestHeader.getConsumerGroup());
                    event.setMessageQueue(mq);
                    event.setOffsetRequest(requestHeader.getQueueOffset().longValue());
                    event.setOffsetNew(getMessageResult.getNextBeginOffset());
                    log.warn("PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", new Object[]{requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), responseHeader.getSuggestWhichBrokerId()});
                    break;
                }
                responseHeader.setSuggestWhichBrokerId(Long.valueOf(subscriptionGroupConfig.getBrokerId()));
                response.setCode(20);
                log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}", new Object[]{requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), responseHeader.getSuggestWhichBrokerId()});
                break;
            }
            default: {
                log.warn("[BUG] impossible result code of get message: {}", (Object)response.getCode());
                assert (false);
                break;
            }
        }
        return response;
    }

    private boolean channelIsWritable(Channel channel, PullMessageRequestHeader requestHeader) {
        if (this.brokerController.getBrokerConfig().isEnableNetWorkFlowControl() && !channel.isWritable()) {
            log.warn("channel {} not writable ,cid {}", (Object)channel.remoteAddress(), (Object)requestHeader.getConsumerGroup());
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected byte[] readGetMessageResult(GetMessageResult getMessageResult, String group, String topic, int queueId) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
        long storeTimestamp = 0L;
        try {
            List messageBufferList = getMessageResult.getMessageBufferList();
            for (ByteBuffer bb : messageBufferList) {
                byteBuffer.put(bb);
                int sysFlag = bb.getInt(36);
                int bornhostLength = (sysFlag & 0x10) == 0 ? 8 : 20;
                int msgStoreTimePos = 48 + bornhostLength;
                storeTimestamp = bb.getLong(msgStoreTimePos);
            }
        }
        finally {
            getMessageResult.release();
        }
        this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getMessageStore().now() - storeTimestamp);
        return byteBuffer.array();
    }

    protected void generateOffsetMovedEvent(OffsetMovedEvent event) {
        try {
            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
            msgInner.setTopic("OFFSET_MOVED_EVENT");
            msgInner.setTags(event.getConsumerGroup());
            msgInner.setDelayTimeLevel(0);
            msgInner.setKeys(event.getConsumerGroup());
            msgInner.setBody(event.encode());
            msgInner.setFlag(0);
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String((Map)msgInner.getProperties()));
            msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode((TopicFilterType)TopicFilterType.SINGLE_TAG, (String)msgInner.getTags()));
            msgInner.setQueueId(0);
            msgInner.setSysFlag(0);
            msgInner.setBornTimestamp(System.currentTimeMillis());
            msgInner.setBornHost(RemotingUtil.string2SocketAddress((String)this.brokerController.getBrokerAddr()));
            msgInner.setStoreHost(msgInner.getBornHost());
            msgInner.setReconsumeTimes(0);
            PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }
        catch (Exception e) {
            log.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), (Throwable)e);
        }
    }
}

