package org.joyqueue.broker.kafka.handler;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.command.ProduceRequest;
import org.joyqueue.broker.kafka.command.ProduceResponse;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.coordinator.transaction.TransactionCoordinator;
import org.joyqueue.broker.kafka.coordinator.transaction.TransactionIdManager;
import org.joyqueue.broker.kafka.model.ProducePartitionGroupRequest;
import org.joyqueue.broker.producer.Produce;
import org.joyqueue.domain.QosLevel;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.message.BrokerPrepare;
import org.joyqueue.message.BrokerRollback;
import org.joyqueue.message.SourceType;
import org.joyqueue.network.session.Producer;
import org.joyqueue.network.session.TransactionId;
import org.joyqueue.toolkit.concurrent.EventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/handler/TransactionProduceHandler.class */
public class TransactionProduceHandler {
    protected static final Logger logger = LoggerFactory.getLogger(TransactionProduceHandler.class);
    private KafkaConfig config;
    private Produce produce;
    private TransactionCoordinator transactionCoordinator;
    private TransactionIdManager transactionIdManager;

    public TransactionProduceHandler(KafkaConfig kafkaConfig, Produce produce, TransactionCoordinator transactionCoordinator, TransactionIdManager transactionIdManager) {
        this.config = kafkaConfig;
        this.produce = produce;
        this.transactionCoordinator = transactionCoordinator;
        this.transactionIdManager = transactionIdManager;
    }

    public void produceMessage(ProduceRequest produceRequest, String str, long j, short s, QosLevel qosLevel, Producer producer, ProducePartitionGroupRequest producePartitionGroupRequest, EventListener<ProduceResponse.PartitionResponse> eventListener) {
        short[] sArr = {KafkaErrorCode.NONE.getCode()};
        CountDownLatch countDownLatch = new CountDownLatch(producePartitionGroupRequest.getMessageMap().size());
        for (Map.Entry<Integer, List<BrokerMessage>> entry : producePartitionGroupRequest.getMessageMap().entrySet()) {
            try {
                int intValue = entry.getKey().intValue();
                List<BrokerMessage> value = entry.getValue();
                fillTxId(value, tryPrepare(producer, generateTxId(producer, intValue, str, j, s)).getTxId());
                this.produce.putMessageAsync(producer, value, qosLevel, writeResult -> {
                    if (!writeResult.getCode().equals(JoyQueueCode.SUCCESS)) {
                        logger.error("produce message failed, topic: {}, code: {}", producer.getTopic(), writeResult.getCode());
                    }
                    sArr[0] = KafkaErrorCode.joyQueueCodeFor(writeResult.getCode().getCode());
                    countDownLatch.countDown();
                });
            } catch (Exception e) {
                logger.error("produce message failed, topic: {}", producer.getTopic(), e);
                sArr[0] = KafkaErrorCode.exceptionFor(e);
                countDownLatch.countDown();
            }
        }
        try {
            if (!countDownLatch.await(Math.min(produceRequest.getAckTimeoutMs(), this.config.getProduceTimeout()), TimeUnit.MILLISECONDS)) {
                logger.warn("produce message timeout, topic: {}, request: {}", producer.getTopic(), producePartitionGroupRequest);
                sArr[0] = KafkaErrorCode.KAFKA_STORAGE_ERROR.getCode();
            }
        } catch (InterruptedException e2) {
            logger.error("produce message failed, topic: {}", producer.getTopic(), e2);
            sArr[0] = KafkaErrorCode.KAFKA_STORAGE_ERROR.getCode();
        }
        eventListener.onEvent(new ProduceResponse.PartitionResponse(0L, sArr[0]));
    }

    protected String generateTxId(Producer producer, int i, String str, long j, short s) {
        return this.transactionIdManager.generateId(producer.getTopic(), i, producer.getApp(), str, j, s);
    }

    protected TransactionId tryPrepare(Producer producer, String str) throws Exception {
        TransactionId transaction = this.produce.getTransaction(producer, str);
        if (transaction == null) {
            transaction = prepare(producer, str);
        }
        return transaction;
    }

    protected TransactionId prepare(Producer producer, String str) throws Exception {
        BrokerPrepare brokerPrepare = new BrokerPrepare();
        brokerPrepare.setTopic(producer.getTopic());
        brokerPrepare.setApp(producer.getApp());
        brokerPrepare.setTxId(str);
        brokerPrepare.setTimeout(this.config.getTransactionTimeout());
        brokerPrepare.setSource(SourceType.KAFKA.getValue());
        return this.produce.putTransactionMessage(producer, brokerPrepare);
    }

    protected TransactionId rollback(Producer producer, String str) throws Exception {
        BrokerRollback brokerRollback = new BrokerRollback();
        brokerRollback.setTopic(producer.getTopic());
        brokerRollback.setApp(producer.getApp());
        brokerRollback.setTxId(str);
        return this.produce.putTransactionMessage(producer, brokerRollback);
    }

    protected void fillTxId(List<BrokerMessage> list, String str) {
        Iterator<BrokerMessage> it = list.iterator();
        while (it.hasNext()) {
            it.next().setTxId(str);
        }
    }
}
