package org.joyqueue.broker.kafka.handler;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.broker.buffer.Serializer;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.consumer.MessageConvertSupport;
import org.joyqueue.broker.consumer.model.PullResult;
import org.joyqueue.broker.helper.SessionHelper;
import org.joyqueue.broker.kafka.KafkaCommandType;
import org.joyqueue.broker.kafka.KafkaContext;
import org.joyqueue.broker.kafka.KafkaContextAware;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.command.FetchRequest;
import org.joyqueue.broker.kafka.command.FetchResponse;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.converter.CheckResultConverter;
import org.joyqueue.broker.kafka.helper.KafkaClientHelper;
import org.joyqueue.broker.kafka.message.KafkaBrokerMessage;
import org.joyqueue.broker.kafka.message.converter.KafkaMessageConverter;
import org.joyqueue.broker.monitor.SessionManager;
import org.joyqueue.broker.network.traffic.Traffic;
import org.joyqueue.domain.Consumer;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.message.SourceType;
import org.joyqueue.network.protocol.annotation.FetchHandler;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.response.BooleanResponse;
import org.joyqueue.toolkit.delay.AbstractDelayedOperation;
import org.joyqueue.toolkit.delay.DelayedOperation;
import org.joyqueue.toolkit.delay.DelayedOperationKey;
import org.joyqueue.toolkit.delay.DelayedOperationManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@FetchHandler
/* loaded from: input_file:org/joyqueue/broker/kafka/handler/FetchRequestHandler.class */
public class FetchRequestHandler extends AbstractKafkaCommandHandler implements KafkaContextAware {
    protected static final Logger logger = LoggerFactory.getLogger(FetchRequestHandler.class);
    private KafkaConfig config;
    private Consume consume;
    private ClusterManager clusterManager;
    private MessageConvertSupport messageConvertSupport;
    private SessionManager sessionManager;
    private DelayedOperationManager<DelayedOperation> delayPurgatory;

    @Override // org.joyqueue.broker.kafka.KafkaContextAware
    public void setKafkaContext(KafkaContext kafkaContext) {
        this.config = kafkaContext.getConfig();
        this.consume = kafkaContext.getBrokerContext().getConsume();
        this.clusterManager = kafkaContext.getBrokerContext().getClusterManager();
        this.messageConvertSupport = kafkaContext.getBrokerContext().getMessageConvertSupport();
        this.sessionManager = kafkaContext.getBrokerContext().getSessionManager();
        this.delayPurgatory = new DelayedOperationManager<>("kafka-fetch-delay");
        this.delayPurgatory.start();
    }

    public Command handle(final Transport transport, final Command command) {
        FetchRequest fetchRequest = (FetchRequest) command.getPayload();
        Map<String, List<FetchRequest.PartitionRequest>> partitionRequests = fetchRequest.getPartitionRequests();
        String parseClient = KafkaClientHelper.parseClient(fetchRequest.getClientId());
        String hostString = ((InetSocketAddress) transport.remoteAddress()).getHostString();
        int maxBytes = fetchRequest.getMaxBytes();
        Traffic traffic = new Traffic(parseClient);
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(partitionRequests.size());
        int i = 0;
        for (Map.Entry<String, List<FetchRequest.PartitionRequest>> entry : partitionRequests.entrySet()) {
            TopicName parse = TopicName.parse(entry.getKey());
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(entry.getValue().size());
            Consumer consumerById = this.sessionManager.getConsumerById(SessionHelper.getConnection(transport).getConsumer(parse.getFullName(), parseClient));
            Consumer.ConsumerPolicy tryGetConsumerPolicy = this.clusterManager.tryGetConsumerPolicy(parse, parseClient);
            for (FetchRequest.PartitionRequest partitionRequest : entry.getValue()) {
                int partition = partitionRequest.getPartition();
                if (consumerById == null) {
                    newArrayListWithCapacity.add(new FetchResponse.PartitionResponse(partition, KafkaErrorCode.NOT_LEADER_FOR_PARTITION.getCode()));
                } else if (i > maxBytes) {
                    newArrayListWithCapacity.add(new FetchResponse.PartitionResponse(partition, KafkaErrorCode.NONE.getCode()));
                } else {
                    BooleanResponse checkReadable = this.clusterManager.checkReadable(parse, parseClient, hostString, (short) partition);
                    if (checkReadable.isSuccess()) {
                        FetchResponse.PartitionResponse fetchMessage = fetchMessage(transport, consumerById, tryGetConsumerPolicy, parse, partition, parseClient, partitionRequest.getOffset(), partitionRequest.getMaxBytes());
                        i += fetchMessage.getBytes();
                        newArrayListWithCapacity.add(fetchMessage);
                        traffic.record(parse.getFullName(), fetchMessage.getMessages() == null ? 0 : fetchMessage.getMessages().size());
                    } else {
                        logger.warn("checkReadable failed, transport: {}, topic: {}, partition: {}, app: {}, code: {}", new Object[]{transport, parse, Integer.valueOf(partition), parseClient, checkReadable.getJoyQueueCode()});
                        newArrayListWithCapacity.add(new FetchResponse.PartitionResponse(partition, CheckResultConverter.convertFetchCode(checkReadable.getJoyQueueCode())));
                        traffic.record(parse.getFullName(), 0);
                    }
                }
            }
            newHashMapWithExpectedSize.put(entry.getKey(), newArrayListWithCapacity);
        }
        FetchResponse fetchResponse = new FetchResponse();
        fetchResponse.setPartitionResponses(newHashMapWithExpectedSize);
        final Command command2 = new Command(fetchResponse);
        if (fetchRequest.getMinBytes() <= i || fetchRequest.getMaxWait() <= 0 || !this.config.getFetchDelay()) {
            return command2;
        }
        this.delayPurgatory.tryCompleteElseWatch(new AbstractDelayedOperation(fetchRequest.getMaxWait()) { // from class: org.joyqueue.broker.kafka.handler.FetchRequestHandler.1
            protected void onComplete() {
                transport.acknowledge(command, command2);
            }
        }, Sets.newHashSet(new Object[]{new DelayedOperationKey(new Object[0])}));
        return null;
    }

    private FetchResponse.PartitionResponse fetchMessage(Transport transport, org.joyqueue.network.session.Consumer consumer, Consumer.ConsumerPolicy consumerPolicy, TopicName topicName, int i, String str, long j, int i2) {
        long minIndex = this.consume.getMinIndex(consumer, (short) i);
        long maxIndex = this.consume.getMaxIndex(consumer, (short) i);
        if (j < minIndex || j > maxIndex) {
            logger.warn("fetch message exception, index out of range, transport: {}, consumer: {}, partition: {}, offset: {}, minOffset: {}, maxOffset: {}", new Object[]{transport, consumer, Integer.valueOf(i), Long.valueOf(j), Long.valueOf(minIndex), Long.valueOf(maxIndex)});
            return new FetchResponse.PartitionResponse(i, KafkaErrorCode.OFFSET_OUT_OF_RANGE.getCode());
        }
        LinkedList newLinkedList = Lists.newLinkedList();
        short shortValue = consumerPolicy.getBatchSize().shortValue();
        int i3 = 0;
        while (i3 < i2 && j < maxIndex) {
            try {
                List<BrokerMessage> doFetchMessage = doFetchMessage(consumer, i, j, shortValue);
                if (CollectionUtils.isEmpty(doFetchMessage)) {
                    break;
                }
                short s = 0;
                int i4 = 0;
                for (BrokerMessage brokerMessage : doFetchMessage) {
                    i3 += brokerMessage.getSize();
                    KafkaBrokerMessage kafkaBrokerMessage = KafkaMessageConverter.toKafkaBrokerMessage(topicName.getFullName(), i, brokerMessage);
                    newLinkedList.add(kafkaBrokerMessage);
                    if (kafkaBrokerMessage.isBatch()) {
                        s = (short) (s + kafkaBrokerMessage.getFlag());
                        i4 += kafkaBrokerMessage.getFlag();
                    } else {
                        s = (short) (s + 1);
                        i4++;
                    }
                }
                if (i4 < shortValue) {
                    break;
                }
                j += s;
            } catch (Exception e) {
                logger.error("fetch message exception, consumer: {}, partition: {}, offset: {}, batchSize: {}", new Object[]{consumer, Integer.valueOf(i), Long.valueOf(j), Integer.valueOf(shortValue), e});
            }
        }
        FetchResponse.PartitionResponse partitionResponse = new FetchResponse.PartitionResponse(i, KafkaErrorCode.NONE.getCode(), newLinkedList);
        partitionResponse.setBytes(i3);
        partitionResponse.setLogStartOffset(minIndex);
        partitionResponse.setLastStableOffset(maxIndex);
        partitionResponse.setHighWater(maxIndex);
        return partitionResponse;
    }

    private List<BrokerMessage> doFetchMessage(org.joyqueue.network.session.Consumer consumer, int i, long j, int i2) throws Exception {
        PullResult message = this.consume.getMessage(consumer, (short) i, j, i2);
        if (message.getCode() != JoyQueueCode.SUCCESS) {
            logger.warn("fetch message error, consumer: {}, partition: {}, offset: {}, batchSize: {}, code: {}", new Object[]{consumer, Integer.valueOf(i), Long.valueOf(j), Integer.valueOf(i2), message.getCode()});
            return null;
        }
        if (message.size() == 0) {
            return null;
        }
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(message.getBuffers().size());
        Iterator it = message.getBuffers().iterator();
        while (it.hasNext()) {
            newArrayListWithCapacity.add(Serializer.readBrokerMessage((ByteBuffer) it.next()));
        }
        return this.messageConvertSupport.convert(newArrayListWithCapacity, SourceType.KAFKA.getValue());
    }

    public int type() {
        return KafkaCommandType.FETCH.getCode();
    }
}
