package org.joyqueue.broker.kafka.coordinator.group;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.Coordinator;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupMetadata;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupState;
import org.joyqueue.broker.kafka.model.OffsetAndMetadata;
import org.joyqueue.broker.kafka.model.OffsetMetadataAndError;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/coordinator/group/GroupOffsetHandler.class */
public class GroupOffsetHandler extends Service {
    protected static final Logger logger = LoggerFactory.getLogger(GroupOffsetHandler.class);
    private KafkaConfig config;
    private Coordinator coordinator;
    private GroupMetadataManager groupMetadataManager;
    private GroupBalanceManager groupBalanceManager;
    private GroupOffsetManager groupOffsetManager;

    public GroupOffsetHandler(KafkaConfig kafkaConfig, Coordinator coordinator, GroupMetadataManager groupMetadataManager, GroupBalanceManager groupBalanceManager, GroupOffsetManager groupOffsetManager) {
        this.config = kafkaConfig;
        this.coordinator = coordinator;
        this.groupMetadataManager = groupMetadataManager;
        this.groupBalanceManager = groupBalanceManager;
        this.groupOffsetManager = groupOffsetManager;
    }

    public Map<String, List<OffsetMetadataAndError>> commitOffsets(String str, String str2, int i, Map<String, List<OffsetAndMetadata>> map) {
        if (!isStarted()) {
            return buildCommitError(map, KafkaErrorCode.COORDINATOR_NOT_AVAILABLE.getCode());
        }
        if (!this.coordinator.isCurrentGroup(str)) {
            logger.info("group {} coordinator changed", str);
            return buildCommitError(map, KafkaErrorCode.COORDINATOR_NOT_AVAILABLE.getCode());
        }
        if (StringUtils.isBlank(str2)) {
            return this.groupOffsetManager.saveOffsets(str, map);
        }
        GroupMetadata group = this.groupMetadataManager.getGroup(str);
        if (group != null) {
            return handleCommitOffsets(group, str, str2, i, map);
        }
        logger.info("offset commit, group({}) is null, member id is {}, generationId is {}, offsetMetadata is {}", new Object[]{str, str2, Integer.valueOf(i), JSON.toJSONString(map)});
        return i < 0 ? this.groupOffsetManager.saveOffsets(str, map) : buildCommitError(map, KafkaErrorCode.ILLEGAL_GENERATION.getCode());
    }

    protected Map<String, List<OffsetMetadataAndError>> handleCommitOffsets(GroupMetadata groupMetadata, String str, String str2, int i, Map<String, List<OffsetAndMetadata>> map) {
        if (groupMetadata.stateIs(GroupState.DEAD) || !groupMetadata.isHasMember(str2)) {
            return buildCommitError(map, KafkaErrorCode.UNKNOWN_MEMBER_ID.getCode());
        }
        if (groupMetadata.stateIs(GroupState.EMPTY) && i < 0) {
            return this.groupOffsetManager.saveOffsets(str, map);
        }
        if (groupMetadata.stateIs(GroupState.AWAITINGSYNC)) {
            return buildCommitError(map, KafkaErrorCode.REBALANCE_IN_PROGRESS.getCode());
        }
        if (i != groupMetadata.getGenerationId()) {
            return buildCommitError(map, KafkaErrorCode.ILLEGAL_GENERATION.getCode());
        }
        this.groupBalanceManager.completeAndScheduleNextHeartbeatExpiration(groupMetadata, groupMetadata.getMember(str2));
        return this.groupOffsetManager.saveOffsets(str, map);
    }

    public Map<String, List<OffsetMetadataAndError>> fetchOffsets(String str, Map<String, List<Integer>> map) {
        return !isStarted() ? buildFetchError(map, KafkaErrorCode.COORDINATOR_NOT_AVAILABLE.getCode()) : this.groupOffsetManager.getOffsets(str, map);
    }

    protected Map<String, List<OffsetMetadataAndError>> buildFetchError(Map<String, List<Integer>> map, short s) {
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(map.size());
        for (Map.Entry<String, List<Integer>> entry : map.entrySet()) {
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(entry.getValue().size());
            newHashMapWithExpectedSize.put(entry.getKey(), newArrayListWithCapacity);
            Iterator<Integer> it = entry.getValue().iterator();
            while (it.hasNext()) {
                newArrayListWithCapacity.add(new OffsetMetadataAndError(it.next().intValue(), s));
            }
        }
        return newHashMapWithExpectedSize;
    }

    protected Map<String, List<OffsetMetadataAndError>> buildCommitError(Map<String, List<OffsetAndMetadata>> map, short s) {
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(map.size());
        for (Map.Entry<String, List<OffsetAndMetadata>> entry : map.entrySet()) {
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(entry.getValue().size());
            newHashMapWithExpectedSize.put(entry.getKey(), newArrayListWithCapacity);
            Iterator<OffsetAndMetadata> it = entry.getValue().iterator();
            while (it.hasNext()) {
                newArrayListWithCapacity.add(new OffsetMetadataAndError(it.next().getPartition(), s));
            }
        }
        return newHashMapWithExpectedSize;
    }
}
