package org.joyqueue.broker.kafka.coordinator.group;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.joyqueue.broker.cluster.ClusterNameService;
import org.joyqueue.broker.index.command.ConsumeIndexQueryRequest;
import org.joyqueue.broker.index.command.ConsumeIndexQueryResponse;
import org.joyqueue.broker.index.command.ConsumeIndexStoreRequest;
import org.joyqueue.broker.index.command.ConsumeIndexStoreResponse;
import org.joyqueue.broker.index.model.IndexAndMetadata;
import org.joyqueue.broker.index.model.IndexMetadataAndError;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.group.domain.GroupMetadata;
import org.joyqueue.broker.kafka.model.OffsetAndMetadata;
import org.joyqueue.broker.kafka.model.OffsetMetadataAndError;
import org.joyqueue.domain.Broker;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.CommandCallback;
import org.joyqueue.network.transport.command.JoyQueueCommand;
import org.joyqueue.network.transport.session.session.TransportSession;
import org.joyqueue.network.transport.session.session.TransportSessionManager;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/coordinator/group/GroupOffsetManager.class */
public class GroupOffsetManager extends Service {
    protected static final Logger logger = LoggerFactory.getLogger(GroupOffsetManager.class);
    private KafkaConfig config;
    private ClusterNameService clusterNameService;
    private GroupMetadataManager groupMetadataManager;
    private TransportSessionManager sessionManager;

    public GroupOffsetManager(KafkaConfig kafkaConfig, ClusterNameService clusterNameService, GroupMetadataManager groupMetadataManager, TransportSessionManager transportSessionManager) {
        this.config = kafkaConfig;
        this.clusterNameService = clusterNameService;
        this.groupMetadataManager = groupMetadataManager;
        this.sessionManager = transportSessionManager;
    }

    public Map<String, List<OffsetMetadataAndError>> getOffsets(final String str, Map<String, List<Integer>> map) {
        Map<Broker, Map<String, List<Integer>>> splitPartitionByBroker = splitPartitionByBroker(map);
        final CountDownLatch countDownLatch = new CountDownLatch(splitPartitionByBroker.size());
        final HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(map.size());
        for (Map.Entry<Broker, Map<String, List<Integer>>> entry : splitPartitionByBroker.entrySet()) {
            final Broker key = entry.getKey();
            try {
                TransportSession orCreateSession = this.sessionManager.getOrCreateSession(key);
                final ConsumeIndexQueryRequest consumeIndexQueryRequest = new ConsumeIndexQueryRequest(str, entry.getValue());
                orCreateSession.async(new JoyQueueCommand(consumeIndexQueryRequest), this.config.getOffsetSyncTimeout(), new CommandCallback() { // from class: org.joyqueue.broker.kafka.coordinator.group.GroupOffsetManager.1
                    public void onSuccess(Command command, Command command2) {
                        synchronized (newHashMapWithExpectedSize) {
                            for (Map.Entry entry2 : ((ConsumeIndexQueryResponse) command2.getPayload()).getTopicPartitionIndex().entrySet()) {
                                String str2 = (String) entry2.getKey();
                                List list = (List) newHashMapWithExpectedSize.get(str2);
                                if (list == null) {
                                    list = Lists.newLinkedList();
                                    newHashMapWithExpectedSize.put(str2, list);
                                }
                                for (Map.Entry entry3 : ((Map) entry2.getValue()).entrySet()) {
                                    IndexMetadataAndError indexMetadataAndError = (IndexMetadataAndError) entry3.getValue();
                                    list.add(new OffsetMetadataAndError(((Integer) entry3.getKey()).intValue(), indexMetadataAndError.getIndex(), indexMetadataAndError.getMetadata(), KafkaErrorCode.joyQueueCodeFor(indexMetadataAndError.getError())));
                                    if (((IndexMetadataAndError) entry3.getValue()).getError() != JoyQueueCode.SUCCESS.getCode()) {
                                        GroupOffsetManager.logger.error("get offset error, broker: {}, topic: {}, partition: {}, group: {},code: {}", new Object[]{key, str2, entry3.getKey(), str, JoyQueueCode.valueOf(((IndexMetadataAndError) entry3.getValue()).getError())});
                                    }
                                }
                            }
                            countDownLatch.countDown();
                        }
                    }

                    public void onException(Command command, Throwable th) {
                        GroupOffsetManager.logger.error("get offset failed, async transport exception, broker: {}, request: {}, group: {}", new Object[]{key, consumeIndexQueryRequest, str, th});
                        synchronized (newHashMapWithExpectedSize) {
                            for (Map.Entry entry2 : consumeIndexQueryRequest.getTopicPartitions().entrySet()) {
                                String str2 = (String) entry2.getKey();
                                List list = (List) newHashMapWithExpectedSize.get(str2);
                                if (list == null) {
                                    list = Lists.newLinkedList();
                                    newHashMapWithExpectedSize.put(str2, list);
                                }
                                Iterator it = ((List) entry2.getValue()).iterator();
                                while (it.hasNext()) {
                                    list.add(new OffsetMetadataAndError(((Integer) it.next()).intValue(), -1L, "", KafkaErrorCode.NOT_LEADER_FOR_PARTITION.getCode()));
                                }
                            }
                            countDownLatch.countDown();
                        }
                    }
                });
            } catch (Throwable th) {
                logger.error("get offset failed, async transport exception, broker: {}, topic: {}, group: {}", new Object[]{key, entry.getValue(), str, th});
                countDownLatch.countDown();
            }
        }
        try {
            if (!countDownLatch.await(this.config.getOffsetSyncTimeout(), TimeUnit.MILLISECONDS)) {
                logger.error("get offset timeout, partitions: {}, group: {}", map, str);
            }
        } catch (InterruptedException e) {
            logger.error("get offset latch await exception, group: {}, partitions: {}", new Object[]{str, map, e});
        }
        fillErrorOffset(str, newHashMapWithExpectedSize);
        return newHashMapWithExpectedSize;
    }

    protected void fillErrorOffset(String str, Map<String, List<OffsetMetadataAndError>> map) {
        GroupMetadata group = this.groupMetadataManager.getGroup(str);
        if (group == null) {
            return;
        }
        for (Map.Entry<String, List<OffsetMetadataAndError>> entry : map.entrySet()) {
            String key = entry.getKey();
            for (OffsetMetadataAndError offsetMetadataAndError : entry.getValue()) {
                if (offsetMetadataAndError.getError() == KafkaErrorCode.NONE.getCode()) {
                    group.putOffsetCache(key, offsetMetadataAndError.getPartition(), new OffsetAndMetadata(offsetMetadataAndError.getOffset(), (short) offsetMetadataAndError.getPartition()));
                } else {
                    OffsetAndMetadata offsetCache = group.getOffsetCache(key, offsetMetadataAndError.getPartition());
                    if (offsetCache != null) {
                        logger.info("fill error offset, topic: {}, partition: {}, group: {}, offset: {}", new Object[]{key, entry.getKey(), str, offsetCache});
                        offsetMetadataAndError.setOffset(offsetCache.getOffset());
                        offsetMetadataAndError.setMetadata(offsetCache.getMetadata());
                        offsetMetadataAndError.setError(KafkaErrorCode.NONE.getCode());
                    }
                }
            }
        }
    }

    public Map<String, List<OffsetMetadataAndError>> saveOffsets(final String str, Map<String, List<OffsetAndMetadata>> map) {
        Map<Broker, Map<String, List<OffsetAndMetadata>>> splitOffsetByBroker = splitOffsetByBroker(map);
        final CountDownLatch countDownLatch = new CountDownLatch(splitOffsetByBroker.size());
        final HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(map.size());
        for (Map.Entry<Broker, Map<String, List<OffsetAndMetadata>>> entry : splitOffsetByBroker.entrySet()) {
            final Broker key = entry.getKey();
            try {
                TransportSession orCreateSession = this.sessionManager.getOrCreateSession(key);
                final ConsumeIndexStoreRequest consumeIndexStoreRequest = new ConsumeIndexStoreRequest(str, buildSaveOffsetParam(entry.getValue()));
                orCreateSession.async(new JoyQueueCommand(consumeIndexStoreRequest), this.config.getOffsetSyncTimeout(), new CommandCallback() { // from class: org.joyqueue.broker.kafka.coordinator.group.GroupOffsetManager.2
                    public void onSuccess(Command command, Command command2) {
                        synchronized (newHashMapWithExpectedSize) {
                            for (Map.Entry entry2 : ((ConsumeIndexStoreResponse) command2.getPayload()).getIndexStoreStatus().entrySet()) {
                                String str2 = (String) entry2.getKey();
                                List list = (List) newHashMapWithExpectedSize.get(str2);
                                if (list == null) {
                                    list = Lists.newLinkedList();
                                    newHashMapWithExpectedSize.put(str2, list);
                                }
                                for (Map.Entry entry3 : ((Map) entry2.getValue()).entrySet()) {
                                    OffsetMetadataAndError offsetMetadataAndError = new OffsetMetadataAndError(((Integer) entry3.getKey()).intValue(), -1L, "", KafkaErrorCode.joyQueueCodeFor(((Short) entry3.getValue()).shortValue()));
                                    if (((Short) entry3.getValue()).shortValue() != JoyQueueCode.SUCCESS.getCode()) {
                                        GroupOffsetManager.logger.error("save offset failed, broker: {}, topic: {}, partition: {}, group: {}, code: {}", new Object[]{key, str2, entry3.getKey(), str, JoyQueueCode.valueOf(((Short) entry3.getValue()).shortValue())});
                                    }
                                    offsetMetadataAndError.setError(KafkaErrorCode.NONE.getCode());
                                    list.add(offsetMetadataAndError);
                                }
                            }
                            countDownLatch.countDown();
                        }
                    }

                    public void onException(Command command, Throwable th) {
                        GroupOffsetManager.logger.error("save offset failed, async transport exception, broker: {}, request: {}, group: {}", new Object[]{key, consumeIndexStoreRequest, str, th});
                        synchronized (newHashMapWithExpectedSize) {
                            for (Map.Entry entry2 : consumeIndexStoreRequest.getIndexMetadata().entrySet()) {
                                String str2 = (String) entry2.getKey();
                                List list = (List) newHashMapWithExpectedSize.get(str2);
                                if (list == null) {
                                    list = Lists.newLinkedList();
                                    newHashMapWithExpectedSize.put(str2, list);
                                }
                                Iterator it = ((Map) entry2.getValue()).entrySet().iterator();
                                while (it.hasNext()) {
                                    list.add(new OffsetMetadataAndError(((Integer) ((Map.Entry) it.next()).getKey()).intValue(), -1L, "", KafkaErrorCode.NONE.getCode()));
                                }
                            }
                            countDownLatch.countDown();
                        }
                    }
                });
            } catch (Throwable th) {
                logger.error("save offset failed, async transport exception, broker: {}, topic: {}, group: {}", new Object[]{key, splitOffsetByBroker, str, th});
                countDownLatch.countDown();
            }
        }
        try {
            if (!countDownLatch.await(this.config.getOffsetSyncTimeout(), TimeUnit.MILLISECONDS)) {
                logger.error("save offset timeout, offsets: {}, group: {}", splitOffsetByBroker, str);
            }
        } catch (InterruptedException e) {
            logger.error("save offset latch await exception, group: {}, offsets: {}", new Object[]{str, map, e});
        }
        fillOffsetCache(str, map);
        return newHashMapWithExpectedSize;
    }

    protected void fillOffsetCache(String str, Map<String, List<OffsetAndMetadata>> map) {
        GroupMetadata group = this.groupMetadataManager.getGroup(str);
        if (group == null) {
            return;
        }
        for (Map.Entry<String, List<OffsetAndMetadata>> entry : map.entrySet()) {
            for (OffsetAndMetadata offsetAndMetadata : entry.getValue()) {
                group.putOffsetCache(entry.getKey(), offsetAndMetadata.getPartition(), offsetAndMetadata);
            }
        }
    }

    protected Map<String, Map<Integer, IndexAndMetadata>> buildSaveOffsetParam(Map<String, List<OffsetAndMetadata>> map) {
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(map.size());
        for (Map.Entry<String, List<OffsetAndMetadata>> entry : map.entrySet()) {
            String key = entry.getKey();
            HashMap newHashMapWithExpectedSize2 = Maps.newHashMapWithExpectedSize(entry.getValue().size());
            newHashMapWithExpectedSize.put(key, newHashMapWithExpectedSize2);
            for (OffsetAndMetadata offsetAndMetadata : entry.getValue()) {
                newHashMapWithExpectedSize2.put(Integer.valueOf(offsetAndMetadata.getPartition()), new IndexAndMetadata(offsetAndMetadata.getOffset(), (String) null));
            }
        }
        return newHashMapWithExpectedSize;
    }

    protected Map<Broker, Map<String, List<OffsetAndMetadata>>> splitOffsetByBroker(Map<String, List<OffsetAndMetadata>> map) {
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(map.size());
        for (Map.Entry<String, List<OffsetAndMetadata>> entry : map.entrySet()) {
            String key = entry.getKey();
            TopicConfig topicConfig = this.clusterNameService.getNameService().getTopicConfig(TopicName.parse(key));
            if (topicConfig == null) {
                logger.error("get leader failed, topic not exist, topic: {}", key);
            } else {
                for (OffsetAndMetadata offsetAndMetadata : entry.getValue()) {
                    Broker fetchBrokerByPartition = topicConfig.fetchBrokerByPartition((short) offsetAndMetadata.getPartition());
                    if (fetchBrokerByPartition == null) {
                        logger.error("get leader failed, topic {}, partition {}, leader not available", key, offsetAndMetadata);
                    } else {
                        Map map2 = (Map) newHashMapWithExpectedSize.get(fetchBrokerByPartition);
                        if (map2 == null) {
                            map2 = Maps.newHashMap();
                            newHashMapWithExpectedSize.put(fetchBrokerByPartition, map2);
                        }
                        List list = (List) map2.get(key);
                        if (list == null) {
                            list = Lists.newLinkedList();
                            map2.put(key, list);
                        }
                        list.add(offsetAndMetadata);
                    }
                }
            }
        }
        return newHashMapWithExpectedSize;
    }

    protected Map<Broker, Map<String, List<Integer>>> splitPartitionByBroker(Map<String, List<Integer>> map) {
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(map.size());
        for (Map.Entry<String, List<Integer>> entry : map.entrySet()) {
            String key = entry.getKey();
            TopicConfig topicConfig = this.clusterNameService.getNameService().getTopicConfig(TopicName.parse(key));
            if (topicConfig == null) {
                logger.error("get leader failed, topic not exist, topic: {}", key);
            } else {
                for (Integer num : entry.getValue()) {
                    Broker fetchBrokerByPartition = topicConfig.fetchBrokerByPartition((short) num.intValue());
                    if (fetchBrokerByPartition == null) {
                        logger.error("get leader failed, topic {}, partition {}, leader not available", key, num);
                    } else {
                        Map map2 = (Map) newHashMapWithExpectedSize.get(fetchBrokerByPartition);
                        if (map2 == null) {
                            map2 = Maps.newHashMap();
                            newHashMapWithExpectedSize.put(fetchBrokerByPartition, map2);
                        }
                        List list = (List) map2.get(key);
                        if (list == null) {
                            list = Lists.newLinkedList();
                            map2.put(key, list);
                        }
                        list.add(num);
                    }
                }
            }
        }
        return newHashMapWithExpectedSize;
    }
}
