package org.joyqueue.broker.kafka.coordinator.group;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.command.SyncGroupAssignment;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.group.callback.JoinCallback;
import org.joyqueue.broker.kafka.coordinator.group.delay.DelayedHeartbeat;
import org.joyqueue.broker.kafka.coordinator.group.delay.DelayedInitialJoin;
import org.joyqueue.broker.kafka.coordinator.group.delay.DelayedJoin;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupMemberMetadata;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupMetadata;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupState;
import org.joyqueue.broker.kafka.message.compressor.lz4.KafkaLZ4BlockOutputStream;
import org.joyqueue.toolkit.delay.DelayedOperation;
import org.joyqueue.toolkit.delay.DelayedOperationKey;
import org.joyqueue.toolkit.delay.DelayedOperationManager;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/coordinator/group/GroupBalanceManager.class */
public class GroupBalanceManager extends Service {
    private KafkaConfig config;
    private GroupMetadataManager groupMetadataManager;
    protected Logger logger = LoggerFactory.getLogger(getClass());
    private DelayedOperationManager<DelayedJoin> joinPurgatory = new DelayedOperationManager<>("kafka-rebalance");
    private DelayedOperationManager<DelayedHeartbeat> heartbeatPurgatory = new DelayedOperationManager<>("kafka-heartbeat");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.joyqueue.broker.kafka.coordinator.group.GroupBalanceManager$1, reason: invalid class name */
    /* loaded from: input_file:org/joyqueue/broker/kafka/coordinator/group/GroupBalanceManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$joyqueue$broker$kafka$coordinator$group$domain$GroupState = new int[GroupState.values().length];

        static {
            try {
                $SwitchMap$org$joyqueue$broker$kafka$coordinator$group$domain$GroupState[GroupState.DEAD.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$joyqueue$broker$kafka$coordinator$group$domain$GroupState[GroupState.EMPTY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$joyqueue$broker$kafka$coordinator$group$domain$GroupState[GroupState.STABLE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$joyqueue$broker$kafka$coordinator$group$domain$GroupState[GroupState.AWAITINGSYNC.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$joyqueue$broker$kafka$coordinator$group$domain$GroupState[GroupState.PREPARINGREBALANCE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    public GroupBalanceManager(KafkaConfig kafkaConfig, GroupMetadataManager groupMetadataManager) {
        this.config = kafkaConfig;
        this.groupMetadataManager = groupMetadataManager;
    }

    protected void doStart() {
        this.joinPurgatory.start();
        this.heartbeatPurgatory.start();
    }

    protected void doStop() {
        this.joinPurgatory.shutdown();
        this.heartbeatPurgatory.shutdown();
    }

    public GroupMemberMetadata addMemberAndRebalance(int i, int i2, String str, String str2, Map<String, byte[]> map, GroupMetadata groupMetadata, JoinCallback joinCallback) {
        String generateMemberId = generateMemberId(groupMetadata, str, str2);
        GroupMemberMetadata groupMemberMetadata = new GroupMemberMetadata(generateMemberId, groupMetadata.getId(), str, str2, i, i2, map);
        this.logger.info("add member, groupId: {}, state: {}, generationId: {}, leaderId: {}, memberId: {}, memberCount = {}, rebalanceTimeout:{}, sessionTimeout:{}", new Object[]{groupMetadata.getId(), groupMetadata.getState(), Integer.valueOf(groupMetadata.getGenerationId()), groupMetadata.getLeaderId(), generateMemberId, Integer.valueOf(groupMetadata.getAllMemberIds().size()), Integer.valueOf(i), Integer.valueOf(i2)});
        if (groupMetadata.stateIs(GroupState.PREPARINGREBALANCE) && groupMetadata.isNewGroup()) {
            groupMetadata.setNewMemberAdded(true);
        }
        if (groupMetadata.stateIs(GroupState.STABLE)) {
            groupMetadata.addExpiredMember(groupMemberMetadata);
        }
        groupMemberMetadata.setAwaitingJoinCallback(joinCallback);
        groupMetadata.addMember(groupMemberMetadata);
        maybePrepareRebalance(groupMetadata);
        return groupMemberMetadata;
    }

    protected String generateMemberId(GroupMetadata groupMetadata, String str, String str2) {
        return groupMetadata.getId() + "-" + str + "-" + str2 + "-" + SystemClock.now();
    }

    public void updateMemberAndRebalance(GroupMetadata groupMetadata, GroupMemberMetadata groupMemberMetadata, Map<String, byte[]> map, JoinCallback joinCallback) {
        groupMemberMetadata.setSupportedProtocols(map);
        groupMemberMetadata.setAwaitingJoinCallback(joinCallback);
        maybePrepareRebalance(groupMetadata);
    }

    public void maybePrepareRebalance(GroupMetadata groupMetadata) {
        synchronized (groupMetadata) {
            if (groupMetadata.canRebalance()) {
                prepareRebalance(groupMetadata);
            }
        }
    }

    public void prepareRebalance(GroupMetadata groupMetadata) {
        this.logger.info("prepare rebalance, groupId:{}, state:{}, generationId:{}, leaderId:{}", new Object[]{groupMetadata.getId(), groupMetadata.getState(), Integer.valueOf(groupMetadata.getGenerationId()), groupMetadata.getLeaderId()});
        if (groupMetadata.stateIs(GroupState.AWAITINGSYNC)) {
            resetAndPropagateAssignmentError(groupMetadata, KafkaErrorCode.REBALANCE_IN_PROGRESS.getCode());
        }
        int rebalanceTimeout = this.config.getRebalanceTimeout() != 0 ? this.config.getRebalanceTimeout() : groupMetadata.getMaxRebalanceTimeout();
        int rebalanceInitialDelay = this.config.getRebalanceInitialDelay();
        DelayedOperation delayedInitialJoin = groupMetadata.stateIs(GroupState.EMPTY) ? new DelayedInitialJoin(this, this.groupMetadataManager, groupMetadata, this.joinPurgatory, rebalanceInitialDelay, rebalanceInitialDelay, Math.max(rebalanceTimeout - rebalanceInitialDelay, 0)) : new DelayedJoin(this, this.groupMetadataManager, groupMetadata, rebalanceTimeout);
        groupMetadata.transitionStateTo(GroupState.PREPARINGREBALANCE);
        this.joinPurgatory.tryCompleteElseWatch(delayedInitialJoin, Sets.newHashSet(new Object[]{new DelayedOperationKey(new Object[]{groupMetadata.getId()})}));
    }

    public void setAndPropagateAssignment(GroupMetadata groupMetadata, Map<String, SyncGroupAssignment> map) {
        Preconditions.checkState(groupMetadata.stateIs(GroupState.AWAITINGSYNC));
        for (GroupMemberMetadata groupMemberMetadata : groupMetadata.getAllMembers()) {
            SyncGroupAssignment syncGroupAssignment = map.get(groupMemberMetadata.getId());
            HashMap newHashMap = Maps.newHashMap();
            for (Map.Entry<String, List<Integer>> entry : syncGroupAssignment.getTopicPartitions().entrySet()) {
                ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(entry.getValue().size());
                Iterator<Integer> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    newArrayListWithCapacity.add(Short.valueOf((short) it.next().intValue()));
                }
                newHashMap.put(entry.getKey(), newArrayListWithCapacity);
            }
            groupMemberMetadata.setAssignment(syncGroupAssignment);
            groupMemberMetadata.setAssignments(newHashMap);
        }
        propagateAssignment(groupMetadata, KafkaErrorCode.NONE.getCode());
    }

    public void resetAndPropagateAssignmentError(GroupMetadata groupMetadata, short s) {
        Preconditions.checkState(groupMetadata.stateIs(GroupState.AWAITINGSYNC));
        Iterator<GroupMemberMetadata> it = groupMetadata.getAllMembers().iterator();
        while (it.hasNext()) {
            it.next().setAssignment(null);
        }
        propagateAssignment(groupMetadata, s);
    }

    public void propagateAssignment(GroupMetadata groupMetadata, short s) {
        for (GroupMemberMetadata groupMemberMetadata : groupMetadata.getAllMembers()) {
            if (groupMemberMetadata.getAwaitingSyncCallback() != null) {
                groupMemberMetadata.getAwaitingSyncCallback().sendResponseCallback(groupMemberMetadata.getAssignment(), s);
                groupMemberMetadata.setAwaitingSyncCallback(null);
                completeAndScheduleNextHeartbeatExpiration(groupMetadata, groupMemberMetadata);
            }
        }
    }

    public void removeMemberAndUpdateGroup(GroupMetadata groupMetadata, GroupMemberMetadata groupMemberMetadata) {
        this.logger.info("member {} in group {} has failed, group state is {}, member count is {}", new Object[]{groupMemberMetadata.getId(), groupMetadata.getId(), groupMetadata.getState(), Integer.valueOf(groupMetadata.getAllMemberIds().size())});
        groupMetadata.removeMember(groupMemberMetadata.getId());
        switch (AnonymousClass1.$SwitchMap$org$joyqueue$broker$kafka$coordinator$group$domain$GroupState[groupMetadata.getState().ordinal()]) {
            case 1:
            case 2:
            default:
                return;
            case 3:
            case KafkaLZ4BlockOutputStream.BLOCKSIZE_64KB /* 4 */:
                maybePrepareRebalance(groupMetadata);
                return;
            case KafkaLZ4BlockOutputStream.BLOCKSIZE_256KB /* 5 */:
                this.joinPurgatory.checkAndComplete(new DelayedOperationKey(new Object[]{groupMetadata.getId()}));
                return;
        }
    }

    public void checkAndComplete(GroupMetadata groupMetadata) {
        this.joinPurgatory.checkAndComplete(new DelayedOperationKey(new Object[]{groupMetadata.getId()}));
    }

    public void completeAndScheduleNextHeartbeatExpiration(GroupMetadata groupMetadata, GroupMemberMetadata groupMemberMetadata) {
        groupMemberMetadata.setLatestHeartbeat(SystemClock.now());
        DelayedOperationKey delayedOperationKey = new DelayedOperationKey(new Object[]{groupMemberMetadata.getGroupId(), groupMemberMetadata.getId()});
        this.heartbeatPurgatory.checkAndComplete(delayedOperationKey);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("handle heartbeat, group {}, member {}, latestHeartbeat is {}, sessionTimeout is {}", new Object[]{groupMetadata.getId(), groupMemberMetadata.getId(), Long.valueOf(groupMemberMetadata.getLatestHeartbeat()), Integer.valueOf(groupMemberMetadata.getSessionTimeout())});
        }
        this.heartbeatPurgatory.tryCompleteElseWatch(new DelayedHeartbeat(this, groupMetadata, groupMemberMetadata, groupMemberMetadata.getLatestHeartbeat() + groupMemberMetadata.getSessionTimeout(), groupMemberMetadata.getSessionTimeout()), Sets.newHashSet(new Object[]{delayedOperationKey}));
    }

    public void removeHeartbeatForLeavingMember(GroupMetadata groupMetadata, GroupMemberMetadata groupMemberMetadata) {
        groupMemberMetadata.setLeaving(true);
        this.heartbeatPurgatory.checkAndComplete(new DelayedOperationKey(new Object[]{groupMemberMetadata.getGroupId(), groupMemberMetadata.getId()}));
    }

    public boolean shouldKeepMemberAlive(GroupMemberMetadata groupMemberMetadata, long j) {
        return (groupMemberMetadata.getAwaitingJoinCallback() == null && groupMemberMetadata.getAwaitingSyncCallback() == null && groupMemberMetadata.getLatestHeartbeat() + ((long) groupMemberMetadata.getSessionTimeout()) <= j) ? false : true;
    }
}
