package org.joyqueue.broker.kafka.handler;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.kafka.KafkaCommandType;
import org.joyqueue.broker.kafka.KafkaContext;
import org.joyqueue.broker.kafka.KafkaContextAware;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.command.ListOffsetsRequest;
import org.joyqueue.broker.kafka.command.ListOffsetsResponse;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.TopicName;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.store.PartitionGroupStore;
import org.joyqueue.store.StoreService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/handler/ListOffsetsRequestHandler.class */
public class ListOffsetsRequestHandler extends AbstractKafkaCommandHandler implements KafkaContextAware {
    protected static final Logger logger = LoggerFactory.getLogger(ListOffsetsRequestHandler.class);
    private static final long EARLIEST_TIMESTAMP = -2;
    private static final long LATEST_TIMESTAMP = -1;
    private ClusterManager clusterManager;
    private StoreService storeService;
    private KafkaConfig config;

    @Override // org.joyqueue.broker.kafka.KafkaContextAware
    public void setKafkaContext(KafkaContext kafkaContext) {
        this.clusterManager = kafkaContext.getBrokerContext().getClusterManager();
        this.storeService = kafkaContext.getBrokerContext().getStoreService();
        this.config = kafkaContext.getConfig();
    }

    public Command handle(Transport transport, Command command) {
        ListOffsetsRequest listOffsetsRequest = (ListOffsetsRequest) command.getPayload();
        Map<String, List<ListOffsetsRequest.PartitionOffsetRequest>> partitionRequests = listOffsetsRequest.getPartitionRequests();
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(partitionRequests.size());
        for (Map.Entry<String, List<ListOffsetsRequest.PartitionOffsetRequest>> entry : partitionRequests.entrySet()) {
            TopicName parse = TopicName.parse(entry.getKey());
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(entry.getValue().size());
            for (ListOffsetsRequest.PartitionOffsetRequest partitionOffsetRequest : entry.getValue()) {
                newArrayListWithCapacity.add(getOffsetByTimestamp(parse, partitionOffsetRequest.getPartition(), partitionOffsetRequest.getTime()));
            }
            newHashMapWithExpectedSize.put(entry.getKey(), newArrayListWithCapacity);
        }
        if (this.config.getLogDetail(listOffsetsRequest.getClientId())) {
            logger.info("list offset, transport: {}, app: {}, request: {}, response: {}", new Object[]{transport, listOffsetsRequest.getClientId(), partitionRequests, newHashMapWithExpectedSize});
        }
        return new Command(new ListOffsetsResponse(newHashMapWithExpectedSize));
    }

    private ListOffsetsResponse.PartitionOffsetResponse getOffsetByTimestamp(TopicName topicName, int i, long j) {
        try {
            PartitionGroup partitionGroup = this.clusterManager.getPartitionGroup(topicName, (short) i);
            if (partitionGroup == null) {
                logger.error("list offset error, partitionGroup not exist, topic: {}, partition: {}", topicName, Integer.valueOf(i));
                return new ListOffsetsResponse.PartitionOffsetResponse(i, KafkaErrorCode.NOT_LEADER_FOR_PARTITION.getCode(), j, -1L);
            }
            PartitionGroupStore store = this.storeService.getStore(topicName.getFullName(), partitionGroup.getGroup());
            return new ListOffsetsResponse.PartitionOffsetResponse(i, KafkaErrorCode.NONE.getCode(), j, j == -1 ? store.getRightIndex((short) i) : j == EARLIEST_TIMESTAMP ? store.getLeftIndex((short) i) : store.getIndex((short) i, j));
        } catch (Exception e) {
            logger.error("list offset exception, topic: {}, partition: {}", new Object[]{topicName, Integer.valueOf(i), e});
            return new ListOffsetsResponse.PartitionOffsetResponse(i, KafkaErrorCode.exceptionFor(e), j, -1L);
        }
    }

    public int type() {
        return KafkaCommandType.LIST_OFFSETS.getCode();
    }
}
