package org.joyqueue.broker.kafka.coordinator.transaction;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.coordinator.Coordinator;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionMetadata;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionPrepare;
import org.joyqueue.broker.kafka.coordinator.transaction.domain.TransactionState;
import org.joyqueue.broker.kafka.coordinator.transaction.exception.TransactionException;
import org.joyqueue.broker.kafka.coordinator.transaction.synchronizer.TransactionSynchronizer;
import org.joyqueue.broker.kafka.model.PartitionMetadataAndError;
import org.joyqueue.domain.Broker;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.nsr.NameService;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/coordinator/transaction/TransactionHandler.class */
public class TransactionHandler extends Service {
    protected static final Logger logger = LoggerFactory.getLogger(TransactionHandler.class);
    private static final String IDEMPOTENCE_TRANSACTION_ID_SUFFIX = "__SUFFIX_";
    private Coordinator coordinator;
    private TransactionMetadataManager transactionMetadataManager;
    private ProducerIdManager producerIdManager;
    private TransactionSynchronizer transactionSynchronizer;
    private NameService nameService;

    public TransactionHandler(Coordinator coordinator, TransactionMetadataManager transactionMetadataManager, ProducerIdManager producerIdManager, TransactionSynchronizer transactionSynchronizer, NameService nameService) {
        this.coordinator = coordinator;
        this.transactionMetadataManager = transactionMetadataManager;
        this.producerIdManager = producerIdManager;
        this.transactionSynchronizer = transactionSynchronizer;
        this.nameService = nameService;
    }

    public TransactionMetadata initProducer(String str, String str2, int i) {
        TransactionMetadata doInitProducer;
        if (StringUtils.isBlank(str2)) {
            str2 = str + IDEMPOTENCE_TRANSACTION_ID_SUFFIX;
        } else {
            checkCoordinatorState(str, str2);
        }
        TransactionMetadata transaction = this.transactionMetadataManager.getTransaction(str2);
        if (transaction == null) {
            transaction = this.transactionMetadataManager.getOrCreateTransaction(new TransactionMetadata(str2, str, this.producerIdManager.generateId(), i, SystemClock.now()));
        }
        synchronized (transaction) {
            doInitProducer = doInitProducer(transaction, i);
        }
        return doInitProducer;
    }

    protected TransactionMetadata doInitProducer(TransactionMetadata transactionMetadata, int i) {
        transactionMetadata.clear();
        transactionMetadata.nextProducerEpoch();
        transactionMetadata.nextEpoch();
        transactionMetadata.setTimeout(i);
        transactionMetadata.updateLastTime();
        transactionMetadata.transitionStateTo(TransactionState.EMPTY);
        return transactionMetadata;
    }

    public Map<String, List<PartitionMetadataAndError>> addPartitionsToTxn(String str, String str2, long j, short s, Map<String, List<Integer>> map) {
        Map<String, List<PartitionMetadataAndError>> doAddPartitionsToTxn;
        checkCoordinatorState(str, str2);
        TransactionMetadata transaction = this.transactionMetadataManager.getTransaction(str2);
        if (transaction == null) {
            throw new TransactionException(KafkaErrorCode.INVALID_PRODUCER_ID_MAPPING.getCode());
        }
        synchronized (transaction) {
            if (transaction.getProducerId() != j || !StringUtils.equals(transaction.getApp(), str)) {
                throw new TransactionException(KafkaErrorCode.INVALID_PRODUCER_ID_MAPPING.getCode());
            }
            if (transaction.getProducerEpoch() != s) {
                throw new TransactionException(KafkaErrorCode.INVALID_PRODUCER_EPOCH.getCode());
            }
            if (transaction.isExpired()) {
                throw new TransactionException(KafkaErrorCode.INVALID_PRODUCER_EPOCH.getCode());
            }
            if (transaction.isPrepared()) {
                throw new TransactionException(KafkaErrorCode.CONCURRENT_TRANSACTIONS.getCode());
            }
            doAddPartitionsToTxn = doAddPartitionsToTxn(transaction, map);
        }
        return doAddPartitionsToTxn;
    }

    protected Map<String, List<PartitionMetadataAndError>> doAddPartitionsToTxn(TransactionMetadata transactionMetadata, Map<String, List<Integer>> map) {
        transactionMetadata.transitionStateTo(TransactionState.ONGOING);
        transactionMetadata.updateLastTime();
        HashSet newHashSet = Sets.newHashSet();
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(map.size());
        for (Map.Entry<String, List<Integer>> entry : map.entrySet()) {
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(entry.getValue().size());
            newHashMapWithExpectedSize.put(entry.getKey(), newArrayListWithCapacity);
            TopicName parse = TopicName.parse(entry.getKey());
            TopicConfig topicConfig = this.nameService.getTopicConfig(parse);
            for (Integer num : entry.getValue()) {
                PartitionGroup fetchPartitionGroupByPartition = topicConfig != null ? topicConfig.fetchPartitionGroupByPartition((short) num.intValue()) : null;
                if (fetchPartitionGroupByPartition == null) {
                    newArrayListWithCapacity.add(new PartitionMetadataAndError(num.intValue(), KafkaErrorCode.UNKNOWN_TOPIC_OR_PARTITION.getCode()));
                } else if (fetchPartitionGroupByPartition.getLeader() == null || fetchPartitionGroupByPartition.getLeader().intValue() <= 0) {
                    newArrayListWithCapacity.add(new PartitionMetadataAndError(num.intValue(), KafkaErrorCode.NOT_LEADER_FOR_PARTITION.getCode()));
                } else {
                    Broker broker = this.nameService.getBroker(fetchPartitionGroupByPartition.getLeader().intValue());
                    if (broker == null) {
                        newArrayListWithCapacity.add(new PartitionMetadataAndError(num.intValue(), KafkaErrorCode.NOT_LEADER_FOR_PARTITION.getCode()));
                    } else {
                        newHashSet.add(new TransactionPrepare(parse.getFullName(), (short) num.intValue(), transactionMetadata.getApp(), broker.getId().intValue(), broker.getIp(), broker.getPort(), transactionMetadata.getId(), transactionMetadata.getProducerId(), transactionMetadata.getProducerEpoch(), transactionMetadata.getEpoch(), transactionMetadata.getTimeout(), SystemClock.now()));
                    }
                }
            }
        }
        if (CollectionUtils.isNotEmpty(newHashSet)) {
            try {
                this.transactionSynchronizer.prepare(transactionMetadata, newHashSet);
                transactionMetadata.addPrepare(newHashSet);
                for (TransactionPrepare transactionPrepare : newHashSet) {
                    ((List) newHashMapWithExpectedSize.get(transactionPrepare.getTopic())).add(new PartitionMetadataAndError(transactionPrepare.getPartition(), KafkaErrorCode.NONE.getCode()));
                }
            } catch (Exception e) {
                logger.error("transaction prepare exception, metadata:{}, prepare: {}", new Object[]{transactionMetadata, newHashSet, e});
                for (TransactionPrepare transactionPrepare2 : newHashSet) {
                    ((List) newHashMapWithExpectedSize.get(transactionPrepare2.getTopic())).add(new PartitionMetadataAndError(transactionPrepare2.getPartition(), KafkaErrorCode.COORDINATOR_NOT_AVAILABLE.getCode()));
                }
            }
        }
        return newHashMapWithExpectedSize;
    }

    public boolean endTxn(String str, String str2, long j, short s, boolean z) {
        boolean doEndTxn;
        checkCoordinatorState(str, str2);
        TransactionMetadata transaction = this.transactionMetadataManager.getTransaction(str2);
        if (transaction == null) {
            throw new TransactionException(KafkaErrorCode.INVALID_PRODUCER_ID_MAPPING.getCode());
        }
        synchronized (transaction) {
            if (transaction.getProducerId() != j || !StringUtils.equals(transaction.getApp(), str)) {
                throw new TransactionException(KafkaErrorCode.INVALID_PRODUCER_ID_MAPPING.getCode());
            }
            if (transaction.getProducerEpoch() != s) {
                throw new TransactionException(KafkaErrorCode.INVALID_PRODUCER_EPOCH.getCode());
            }
            if (transaction.isExpired()) {
                throw new TransactionException(KafkaErrorCode.INVALID_PRODUCER_EPOCH.getCode());
            }
            if (transaction.isCompleted()) {
                throw new TransactionException(KafkaErrorCode.INVALID_PRODUCER_EPOCH.getCode());
            }
            doEndTxn = doEndTxn(transaction, z);
        }
        return doEndTxn;
    }

    protected boolean doEndTxn(TransactionMetadata transactionMetadata, boolean z) {
        try {
            if (z) {
                doCommit(transactionMetadata);
            } else {
                doAbort(transactionMetadata);
            }
            transactionMetadata.clear();
            transactionMetadata.nextEpoch();
            return true;
        } catch (Exception e) {
            logger.error("endTxn exception, metadata: {}, isCommit: {}", new Object[]{transactionMetadata, Boolean.valueOf(z), e});
            throw new TransactionException(e, KafkaErrorCode.COORDINATOR_NOT_AVAILABLE.getCode());
        }
    }

    protected void doCommit(TransactionMetadata transactionMetadata) throws Exception {
        if (!transactionMetadata.getState().equals(TransactionState.PREPARE_COMMIT)) {
            if (!this.transactionSynchronizer.prepareCommit(transactionMetadata, transactionMetadata.getPrepare())) {
                throw new JoyQueueException(String.format("prepare commit transaction failed, metadata: %s", transactionMetadata), JoyQueueCode.CN_UNKNOWN_ERROR.getCode());
            }
            transactionMetadata.transitionStateTo(TransactionState.PREPARE_COMMIT);
        }
        if (!this.transactionSynchronizer.commit(transactionMetadata, transactionMetadata.getPrepare(), transactionMetadata.getOffsets())) {
            throw new JoyQueueException(String.format("commit transaction failed, metadata: %s", transactionMetadata), JoyQueueCode.CN_UNKNOWN_ERROR.getCode());
        }
        transactionMetadata.transitionStateTo(TransactionState.COMPLETE_COMMIT);
    }

    protected void doAbort(TransactionMetadata transactionMetadata) throws Exception {
        if (!transactionMetadata.getState().equals(TransactionState.PREPARE_ABORT)) {
            if (!this.transactionSynchronizer.prepareAbort(transactionMetadata, transactionMetadata.getPrepare())) {
                throw new JoyQueueException(String.format("prepare abort transaction failed, metadata: %s", transactionMetadata), JoyQueueCode.CN_UNKNOWN_ERROR.getCode());
            }
            transactionMetadata.transitionStateTo(TransactionState.PREPARE_ABORT);
        }
        if (!this.transactionSynchronizer.abort(transactionMetadata, transactionMetadata.getPrepare())) {
            throw new JoyQueueException(String.format("abort transaction failed, metadata: %s", transactionMetadata), JoyQueueCode.CN_UNKNOWN_ERROR.getCode());
        }
        transactionMetadata.transitionStateTo(TransactionState.COMPLETE_ABORT);
    }

    protected void checkCoordinatorState(String str, String str2) {
        if (!isStarted()) {
            throw new TransactionException(KafkaErrorCode.COORDINATOR_NOT_AVAILABLE.getCode());
        }
        if (!this.coordinator.isCurrentTransaction(str)) {
            throw new TransactionException(KafkaErrorCode.NOT_COORDINATOR.getCode());
        }
    }
}
