package org.joyqueue.broker.kafka.handler;

import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.collections.MapUtils;
import org.joyqueue.broker.kafka.KafkaCommandType;
import org.joyqueue.broker.kafka.KafkaContext;
import org.joyqueue.broker.kafka.KafkaContextAware;
import org.joyqueue.broker.kafka.command.SyncGroupAssignment;
import org.joyqueue.broker.kafka.command.SyncGroupRequest;
import org.joyqueue.broker.kafka.command.SyncGroupResponse;
import org.joyqueue.broker.kafka.coordinator.group.GroupCoordinator;
import org.joyqueue.broker.kafka.coordinator.group.callback.SyncCallback;
import org.joyqueue.broker.kafka.helper.KafkaClientHelper;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.exception.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/handler/SyncGroupRequestHandler.class */
public class SyncGroupRequestHandler extends AbstractKafkaCommandHandler implements KafkaContextAware {
    protected static final Logger logger = LoggerFactory.getLogger(SyncGroupRequestHandler.class);
    private GroupCoordinator groupCoordinator;

    @Override // org.joyqueue.broker.kafka.KafkaContextAware
    public void setKafkaContext(KafkaContext kafkaContext) {
        this.groupCoordinator = kafkaContext.getGroupCoordinator();
    }

    public Command handle(final Transport transport, final Command command) {
        final SyncGroupRequest syncGroupRequest = (SyncGroupRequest) command.getPayload();
        String parseClient = KafkaClientHelper.parseClient(syncGroupRequest.getClientId());
        logger.info("sync group, groupId = {}, clientId = {}, memberId = {}", new Object[]{parseClient, syncGroupRequest.getClientId(), syncGroupRequest.getMemberId()});
        this.groupCoordinator.handleSyncGroup(parseClient, syncGroupRequest.getGenerationId(), syncGroupRequest.getMemberId(), buildAssignmentMap(syncGroupRequest.getGroupAssignment()), new SyncCallback() { // from class: org.joyqueue.broker.kafka.handler.SyncGroupRequestHandler.1
            @Override // org.joyqueue.broker.kafka.coordinator.group.callback.SyncCallback
            public void sendResponseCallback(SyncGroupAssignment syncGroupAssignment, short s) {
                SyncGroupRequestHandler.this.handleSyncGroupResponse(transport, command, syncGroupRequest, syncGroupAssignment, s);
            }
        });
        return null;
    }

    protected void handleSyncGroupResponse(Transport transport, Command command, SyncGroupRequest syncGroupRequest, SyncGroupAssignment syncGroupAssignment, short s) {
        SyncGroupResponse syncGroupResponse = new SyncGroupResponse();
        syncGroupResponse.setAssignment(syncGroupAssignment);
        syncGroupResponse.setErrorCode(s);
        try {
            transport.acknowledge(command, new Command(syncGroupResponse));
        } catch (TransportException e) {
            logger.error("send sync group response for {} failed: ", syncGroupRequest.getGroupId(), e);
        }
    }

    protected Map<String, SyncGroupAssignment> buildAssignmentMap(Map<String, SyncGroupAssignment> map) {
        if (MapUtils.isEmpty(map)) {
            return Collections.emptyMap();
        }
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<String, SyncGroupAssignment> entry : map.entrySet()) {
            newHashMap.put(entry.getKey(), entry.getValue());
        }
        return newHashMap;
    }

    public int type() {
        return KafkaCommandType.SYNC_GROUP.getCode();
    }
}
