package org.joyqueue.broker.kafka.coordinator.transaction.synchronizer;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.joyqueue.broker.cluster.ClusterNameService;
import org.joyqueue.broker.index.command.ConsumeIndexStoreRequest;
import org.joyqueue.broker.index.command.ConsumeIndexStoreResponse;
import org.joyqueue.broker.index.model.IndexAndMetadata;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.transaction.TransactionIdManager;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionMetadata;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionOffset;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionPrepare;
import org.joyqueue.broker.kafka.coordinator.transaction.helper.TransactionHelper;
import org.joyqueue.broker.producer.transaction.command.TransactionCommitRequest;
import org.joyqueue.domain.Broker;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.CommandCallback;
import org.joyqueue.network.transport.command.JoyQueueCommand;
import org.joyqueue.network.transport.session.session.TransportSession;
import org.joyqueue.network.transport.session.session.TransportSessionManager;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/coordinator/transaction/synchronizer/TransactionCommitSynchronizer.class */
public class TransactionCommitSynchronizer extends Service {
    protected static final Logger logger = LoggerFactory.getLogger(TransactionCommitSynchronizer.class);
    private KafkaConfig config;
    private TransportSessionManager sessionManager;
    private TransactionIdManager transactionIdManager;
    private ClusterNameService clusterNameService;

    public TransactionCommitSynchronizer(KafkaConfig kafkaConfig, TransportSessionManager transportSessionManager, TransactionIdManager transactionIdManager, ClusterNameService clusterNameService) {
        this.config = kafkaConfig;
        this.sessionManager = transportSessionManager;
        this.transactionIdManager = transactionIdManager;
        this.clusterNameService = clusterNameService;
    }

    public boolean commitPrepare(TransactionMetadata transactionMetadata, Set<TransactionPrepare> set) throws Exception {
        Map<Broker, List<TransactionPrepare>> splitPrepareByBroker = TransactionHelper.splitPrepareByBroker(set);
        final CountDownLatch countDownLatch = new CountDownLatch(splitPrepareByBroker.size());
        final boolean[] zArr = {true};
        for (Map.Entry<Broker, List<TransactionPrepare>> entry : splitPrepareByBroker.entrySet()) {
            final Broker key = entry.getKey();
            List<TransactionPrepare> value = entry.getValue();
            TransactionPrepare transactionPrepare = value.get(0);
            LinkedList newLinkedList = Lists.newLinkedList();
            for (TransactionPrepare transactionPrepare2 : value) {
                newLinkedList.add(this.transactionIdManager.generateId(transactionPrepare2.getTopic(), transactionPrepare2.getPartition(), transactionPrepare2.getApp(), transactionPrepare2.getTransactionId(), transactionPrepare2.getProducerId(), transactionPrepare2.getProducerEpoch()));
            }
            TransportSession orCreateSession = this.sessionManager.getOrCreateSession(key);
            final TransactionCommitRequest transactionCommitRequest = new TransactionCommitRequest(transactionPrepare.getTopic(), transactionPrepare.getApp(), newLinkedList);
            orCreateSession.async(new JoyQueueCommand(transactionCommitRequest), this.config.getTransactionSyncTimeout(), new CommandCallback() { // from class: org.joyqueue.broker.kafka.coordinator.transaction.synchronizer.TransactionCommitSynchronizer.1
                public void onSuccess(Command command, Command command2) {
                    if (command2.getHeader().getStatus() != JoyQueueCode.SUCCESS.getCode() && command2.getHeader().getStatus() != JoyQueueCode.CN_TRANSACTION_NOT_EXISTS.getCode()) {
                        TransactionCommitSynchronizer.logger.error("commit transaction error, broker: {}, request: {}", key, transactionCommitRequest);
                        zArr[0] = false;
                    }
                    countDownLatch.countDown();
                }

                public void onException(Command command, Throwable th) {
                    TransactionCommitSynchronizer.logger.error("commit transaction error, broker: {}, request: {}", new Object[]{key, transactionCommitRequest, th});
                    zArr[0] = false;
                    countDownLatch.countDown();
                }
            });
        }
        if (countDownLatch.await(this.config.getTransactionSyncTimeout(), TimeUnit.MILLISECONDS)) {
            return zArr[0];
        }
        logger.error("commit transaction timeout, metadata: {}, prepare: {}", transactionMetadata, set);
        return false;
    }

    public boolean commitOffsets(final TransactionMetadata transactionMetadata, Set<TransactionOffset> set) throws Exception {
        Map<Broker, List<TransactionOffset>> splitOffsetsByBroker = splitOffsetsByBroker(set);
        final CountDownLatch countDownLatch = new CountDownLatch(splitOffsetsByBroker.size());
        final boolean[] zArr = {true};
        for (Map.Entry<Broker, List<TransactionOffset>> entry : splitOffsetsByBroker.entrySet()) {
            final Broker key = entry.getKey();
            final Map<String, Map<Integer, IndexAndMetadata>> buildSaveOffsetParam = buildSaveOffsetParam(entry.getValue());
            try {
                this.sessionManager.getOrCreateSession(key).async(new JoyQueueCommand(new ConsumeIndexStoreRequest(transactionMetadata.getApp(), buildSaveOffsetParam)), this.config.getTransactionSyncTimeout(), new CommandCallback() { // from class: org.joyqueue.broker.kafka.coordinator.transaction.synchronizer.TransactionCommitSynchronizer.2
                    public void onSuccess(Command command, Command command2) {
                        for (Map.Entry entry2 : ((ConsumeIndexStoreResponse) command2.getPayload()).getIndexStoreStatus().entrySet()) {
                            String str = (String) entry2.getKey();
                            for (Map.Entry entry3 : ((Map) entry2.getValue()).entrySet()) {
                                if (((Short) entry3.getValue()).shortValue() != JoyQueueCode.SUCCESS.getCode()) {
                                    TransactionCommitSynchronizer.logger.error("commit transaction offset error, broker: {}, topic: {}, partition: {}, code: {}", new Object[]{key, str, entry3.getKey(), JoyQueueCode.valueOf(((Short) entry3.getValue()).shortValue())});
                                }
                            }
                        }
                        countDownLatch.countDown();
                    }

                    public void onException(Command command, Throwable th) {
                        TransactionCommitSynchronizer.logger.error("commit transaction offset failed, async transport exception, broker: {}, topic: {}, group: {}", new Object[]{key, buildSaveOffsetParam, transactionMetadata.getApp(), th});
                        zArr[0] = false;
                        countDownLatch.countDown();
                    }
                });
            } catch (Throwable th) {
                logger.error("sync offset failed, async transport exception, topic: {}, group: {}, leader: {id: {}, ip: {}, port: {}}", new Object[]{buildSaveOffsetParam, transactionMetadata.getApp(), key.getId(), key.getIp(), Integer.valueOf(key.getBackEndPort()), th});
                countDownLatch.countDown();
            }
        }
        if (countDownLatch.await(this.config.getTransactionSyncTimeout(), TimeUnit.MILLISECONDS)) {
            return zArr[0];
        }
        logger.error("commit transaction timeout, metadata: {}, offsets: {}", transactionMetadata, set);
        return false;
    }

    protected Map<String, Map<Integer, IndexAndMetadata>> buildSaveOffsetParam(List<TransactionOffset> list) {
        HashMap newHashMap = Maps.newHashMap();
        for (TransactionOffset transactionOffset : list) {
            String topic = transactionOffset.getTopic();
            Map map = (Map) newHashMap.get(topic);
            if (map == null) {
                map = Maps.newHashMap();
                newHashMap.put(topic, map);
            }
            map.put(Integer.valueOf(transactionOffset.getPartition()), new IndexAndMetadata(transactionOffset.getOffset(), (String) null));
        }
        return newHashMap;
    }

    protected Map<Broker, List<TransactionOffset>> splitOffsetsByBroker(Set<TransactionOffset> set) {
        PartitionGroup fetchPartitionGroupByPartition;
        Integer leader;
        Broker broker;
        HashMap newHashMap = Maps.newHashMap();
        for (TransactionOffset transactionOffset : set) {
            TopicConfig topicConfig = this.clusterNameService.getTopicConfig(TopicName.parse(transactionOffset.getTopic()));
            if (topicConfig != null && (fetchPartitionGroupByPartition = topicConfig.fetchPartitionGroupByPartition(transactionOffset.getPartition())) != null && (leader = fetchPartitionGroupByPartition.getLeader()) != null && leader.intValue() > 0 && (broker = this.clusterNameService.getNameService().getBroker(leader.intValue())) != null) {
                List list = (List) newHashMap.get(broker);
                if (list == null) {
                    list = Lists.newLinkedList();
                    newHashMap.put(broker, list);
                }
                list.add(transactionOffset);
            }
        }
        return newHashMap;
    }
}
