package org.joyqueue.broker.kafka.handler;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ArrayUtils;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.helper.SessionHelper;
import org.joyqueue.broker.kafka.KafkaAcknowledge;
import org.joyqueue.broker.kafka.KafkaCommandType;
import org.joyqueue.broker.kafka.KafkaContext;
import org.joyqueue.broker.kafka.KafkaContextAware;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.command.ProduceRequest;
import org.joyqueue.broker.kafka.command.ProduceResponse;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.converter.CheckResultConverter;
import org.joyqueue.broker.kafka.coordinator.transaction.ProducerSequenceManager;
import org.joyqueue.broker.kafka.helper.KafkaClientHelper;
import org.joyqueue.broker.kafka.message.KafkaBrokerMessage;
import org.joyqueue.broker.kafka.message.converter.KafkaMessageConverter;
import org.joyqueue.broker.kafka.model.ProducePartitionGroupRequest;
import org.joyqueue.broker.monitor.SessionManager;
import org.joyqueue.broker.network.traffic.Traffic;
import org.joyqueue.broker.producer.ProduceConfig;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.QosLevel;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.network.session.Connection;
import org.joyqueue.network.session.Producer;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.response.BooleanResponse;
import org.joyqueue.toolkit.concurrent.EventListener;
import org.joyqueue.toolkit.delay.AbstractDelayedOperation;
import org.joyqueue.toolkit.delay.DelayedOperation;
import org.joyqueue.toolkit.delay.DelayedOperationKey;
import org.joyqueue.toolkit.delay.DelayedOperationManager;
import org.joyqueue.toolkit.network.IpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@org.joyqueue.network.protocol.annotation.ProduceHandler
/* loaded from: input_file:org/joyqueue/broker/kafka/handler/ProduceRequestHandler.class */
public class ProduceRequestHandler extends AbstractKafkaCommandHandler implements KafkaContextAware {
    protected static final Logger logger = LoggerFactory.getLogger(ProduceRequestHandler.class);
    private ClusterManager clusterManager;
    private ProduceConfig produceConfig;
    private ProduceHandler produceHandler;
    private TransactionProduceHandler transactionProduceHandler;
    private ProducerSequenceManager producerSequenceManager;
    private SessionManager sessionManager;
    private KafkaConfig config;
    private DelayedOperationManager<DelayedOperation> delayPurgatory;

    @Override // org.joyqueue.broker.kafka.KafkaContextAware
    public void setKafkaContext(KafkaContext kafkaContext) {
        this.clusterManager = kafkaContext.getBrokerContext().getClusterManager();
        this.produceConfig = new ProduceConfig(kafkaContext.getBrokerContext().getPropertySupplier());
        this.produceHandler = new ProduceHandler(kafkaContext.getBrokerContext().getProduce());
        this.transactionProduceHandler = new TransactionProduceHandler(kafkaContext.getConfig(), kafkaContext.getBrokerContext().getProduce(), kafkaContext.getTransactionCoordinator(), kafkaContext.getTransactionIdManager());
        this.producerSequenceManager = kafkaContext.getProducerSequenceManager();
        this.sessionManager = kafkaContext.getBrokerContext().getSessionManager();
        this.config = kafkaContext.getConfig();
        this.delayPurgatory = new DelayedOperationManager<>("kafka-produce-delay");
        this.delayPurgatory.start();
    }

    public Command handle(final Transport transport, final Command command) {
        ProduceRequest produceRequest = (ProduceRequest) command.getPayload();
        QosLevel convertToQosLevel = KafkaAcknowledge.convertToQosLevel(KafkaAcknowledge.valueOf(produceRequest.getRequiredAcks()));
        String parseClient = KafkaClientHelper.parseClient(produceRequest.getClientId());
        Map<String, List<ProduceRequest.PartitionRequest>> partitionRequests = produceRequest.getPartitionRequests();
        final HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(partitionRequests.size());
        final CountDownLatch countDownLatch = new CountDownLatch(produceRequest.getPartitionNum());
        final boolean z = !convertToQosLevel.equals(QosLevel.ONE_WAY);
        String hostString = ((InetSocketAddress) transport.remoteAddress()).getHostString();
        byte[] bArr = IpUtil.toByte((InetSocketAddress) transport.remoteAddress());
        Connection connection = SessionHelper.getConnection(transport);
        final Traffic traffic = new Traffic(parseClient);
        final boolean[] zArr = {false};
        final boolean[] zArr2 = new boolean[1];
        zArr2[0] = partitionRequests.size() == 1;
        for (Map.Entry<String, List<ProduceRequest.PartitionRequest>> entry : partitionRequests.entrySet()) {
            TopicName parse = TopicName.parse(entry.getKey());
            HashMap newHashMap = Maps.newHashMap();
            final ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(entry.getValue().size());
            newHashMapWithExpectedSize.put(parse.getFullName(), newArrayListWithCapacity);
            Producer producerById = this.sessionManager.getProducerById(connection.getProducer(parse.getFullName(), parseClient));
            TopicConfig topicConfig = this.clusterManager.getTopicConfig(parse);
            for (ProduceRequest.PartitionRequest partitionRequest : entry.getValue()) {
                if (producerById == null) {
                    buildPartitionResponse(partitionRequest.getPartition(), null, KafkaErrorCode.NOT_LEADER_FOR_PARTITION.getCode(), partitionRequest.getMessages(), newArrayListWithCapacity);
                    countDownLatch.countDown();
                    zArr[0] = true;
                } else {
                    short checkPartitionRequest = checkPartitionRequest(transport, produceRequest, partitionRequest, parse, producerById, hostString);
                    if (checkPartitionRequest != KafkaErrorCode.NONE.getCode()) {
                        buildPartitionResponse(partitionRequest.getPartition(), null, checkPartitionRequest, partitionRequest.getMessages(), newArrayListWithCapacity);
                        countDownLatch.countDown();
                        zArr[0] = true;
                    } else {
                        splitByPartitionGroup(topicConfig, parse, producerById, bArr, traffic, partitionRequest, newHashMap);
                    }
                }
            }
            boolean z2 = newHashMap.size() == 1;
            if (zArr2[0] && !z2) {
                zArr2[0] = false;
            }
            if (zArr2[0] && z && newArrayListWithCapacity.size() == entry.getValue().size()) {
                return delayResponse(transport, command, generateResponse(traffic, newHashMapWithExpectedSize));
            }
            for (final Map.Entry<Integer, ProducePartitionGroupRequest> entry2 : newHashMap.entrySet()) {
                EventListener<ProduceResponse.PartitionResponse> eventListener = new EventListener<ProduceResponse.PartitionResponse>() { // from class: org.joyqueue.broker.kafka.handler.ProduceRequestHandler.1
                    public void onEvent(ProduceResponse.PartitionResponse partitionResponse) {
                        List<Integer> partitions = ((ProducePartitionGroupRequest) entry2.getValue()).getPartitions();
                        synchronized (newArrayListWithCapacity) {
                            Iterator<Integer> it = partitions.iterator();
                            while (it.hasNext()) {
                                newArrayListWithCapacity.add(new ProduceResponse.PartitionResponse(it.next().intValue(), 0L, partitionResponse.getErrorCode()));
                                countDownLatch.countDown();
                            }
                        }
                        if (partitionResponse.getErrorCode() != KafkaErrorCode.NONE.getCode()) {
                            zArr[0] = true;
                        }
                        if (z && zArr2[0]) {
                            Command delayResponse = zArr[0] ? ProduceRequestHandler.this.delayResponse(transport, command, ProduceRequestHandler.this.generateResponse(traffic, newHashMapWithExpectedSize)) : ProduceRequestHandler.this.generateResponse(traffic, newHashMapWithExpectedSize);
                            if (delayResponse != null) {
                                transport.acknowledge(command, delayResponse);
                            }
                        }
                    }
                };
                if (produceRequest.isTransaction()) {
                    this.transactionProduceHandler.produceMessage(produceRequest, produceRequest.getTransactionalId(), produceRequest.getProducerId(), produceRequest.getProducerEpoch(), convertToQosLevel, producerById, entry2.getValue(), eventListener);
                } else {
                    this.produceHandler.produceMessage(produceRequest, convertToQosLevel, producerById, entry2.getValue(), eventListener);
                }
            }
        }
        if (!z || zArr2[0]) {
            return null;
        }
        try {
            if (!countDownLatch.await(Math.min(produceRequest.getAckTimeoutMs(), this.config.getProduceTimeout()), TimeUnit.MILLISECONDS)) {
                zArr[0] = true;
                logger.warn("wait produce timeout, transport: {}, app: {}, topics: {}", new Object[]{transport.remoteAddress(), parseClient, produceRequest.getPartitionRequests().keySet()});
            }
        } catch (InterruptedException e) {
            logger.error("wait produce exception, transport: {}, app: {}, topics: {}", new Object[]{transport.remoteAddress(), parseClient, produceRequest.getPartitionRequests().keySet(), e});
        }
        Command generateResponse = generateResponse(traffic, newHashMapWithExpectedSize);
        return zArr[0] ? delayResponse(transport, command, generateResponse) : generateResponse;
    }

    protected Command delayResponse(final Transport transport, final Command command, final Command command2) {
        if (this.config.getProduceDelayEnable()) {
            return command2;
        }
        this.delayPurgatory.tryCompleteElseWatch(new AbstractDelayedOperation(this.config.getProduceDelay()) { // from class: org.joyqueue.broker.kafka.handler.ProduceRequestHandler.2
            protected void onComplete() {
                transport.acknowledge(command, command2);
            }
        }, Sets.newHashSet(new Object[]{new DelayedOperationKey(new Object[0])}));
        return null;
    }

    protected Command generateResponse(Traffic traffic, Map<String, List<ProduceResponse.PartitionResponse>> map) {
        return new Command(new ProduceResponse(traffic, map));
    }

    protected short checkPartitionRequest(Transport transport, ProduceRequest produceRequest, ProduceRequest.PartitionRequest partitionRequest, TopicName topicName, Producer producer, String str) {
        short checkAndFillMessages = checkAndFillMessages(partitionRequest.getMessages());
        if (checkAndFillMessages != KafkaErrorCode.NONE.getCode()) {
            return checkAndFillMessages;
        }
        BooleanResponse checkWritable = this.clusterManager.checkWritable(topicName, producer.getApp(), str, (short) partitionRequest.getPartition());
        if (!checkWritable.isSuccess() && !checkWritable.getJoyQueueCode().equals(JoyQueueCode.FW_BROKER_NOT_WRITABLE)) {
            logger.warn("checkWritable failed, transport: {}, topic: {}, partition: {}, app: {}, code: {}", new Object[]{transport, topicName, Integer.valueOf(partitionRequest.getPartition()), producer.getApp(), checkWritable.getJoyQueueCode()});
            return CheckResultConverter.convertProduceCode(checkWritable.getJoyQueueCode());
        }
        int baseSequence = partitionRequest.getMessages().get(0).getBaseSequence();
        if (baseSequence != -1) {
            if (!this.producerSequenceManager.checkSequence(producer.getApp(), produceRequest.getProducerId(), produceRequest.getProducerEpoch(), partitionRequest.getPartition(), baseSequence)) {
                logger.warn("out of order sequence, topic: {}, app: {}, partition: {}, transactionId: {}, producerId: {}, producerEpoch: {}, sequence: {}", new Object[]{producer.getTopic(), producer.getApp(), Integer.valueOf(partitionRequest.getPartition()), produceRequest.getTransactionalId(), Long.valueOf(produceRequest.getProducerId()), Short.valueOf(produceRequest.getProducerEpoch()), Integer.valueOf(baseSequence)});
                return KafkaErrorCode.OUT_OF_ORDER_SEQUENCE_NUMBER.getCode();
            }
            this.producerSequenceManager.updateSequence(producer.getApp(), produceRequest.getProducerId(), produceRequest.getProducerEpoch(), partitionRequest.getPartition(), baseSequence);
        }
        return KafkaErrorCode.NONE.getCode();
    }

    protected void splitByPartitionGroup(TopicConfig topicConfig, TopicName topicName, Producer producer, byte[] bArr, Traffic traffic, ProduceRequest.PartitionRequest partitionRequest, Map<Integer, ProducePartitionGroupRequest> map) {
        PartitionGroup fetchPartitionGroupByPartition = topicConfig.fetchPartitionGroupByPartition((short) partitionRequest.getPartition());
        ProducePartitionGroupRequest producePartitionGroupRequest = map.get(Integer.valueOf(fetchPartitionGroupByPartition.getGroup()));
        if (producePartitionGroupRequest == null) {
            producePartitionGroupRequest = new ProducePartitionGroupRequest(Lists.newLinkedList(), Lists.newLinkedList(), Lists.newLinkedList(), Maps.newHashMap(), Maps.newHashMap());
            map.put(Integer.valueOf(fetchPartitionGroupByPartition.getGroup()), producePartitionGroupRequest);
        }
        LinkedList newLinkedList = Lists.newLinkedList();
        Iterator<KafkaBrokerMessage> it = partitionRequest.getMessages().iterator();
        while (it.hasNext()) {
            newLinkedList.add(KafkaMessageConverter.toBrokerMessage(producer.getTopic(), partitionRequest.getPartition(), producer.getApp(), bArr, it.next()));
        }
        traffic.record(topicName.getFullName(), partitionRequest.getTraffic(), partitionRequest.getSize());
        producePartitionGroupRequest.getPartitions().add(Integer.valueOf(partitionRequest.getPartition()));
        producePartitionGroupRequest.getMessages().addAll(newLinkedList);
        producePartitionGroupRequest.getMessageMap().put(Integer.valueOf(partitionRequest.getPartition()), newLinkedList);
        producePartitionGroupRequest.getKafkaMessages().addAll(partitionRequest.getMessages());
        producePartitionGroupRequest.getKafkaMessageMap().put(Integer.valueOf(partitionRequest.getPartition()), partitionRequest.getMessages());
    }

    protected short checkAndFillMessages(List<KafkaBrokerMessage> list) {
        for (KafkaBrokerMessage kafkaBrokerMessage : list) {
            if (ArrayUtils.getLength(kafkaBrokerMessage.getKey()) <= this.produceConfig.getBusinessIdLength() && ArrayUtils.getLength(kafkaBrokerMessage.getValue()) <= this.produceConfig.getBodyLength()) {
            }
            return KafkaErrorCode.MESSAGE_TOO_LARGE.getCode();
        }
        return KafkaErrorCode.NONE.getCode();
    }

    protected void buildPartitionResponse(int i, long[] jArr, short s, List<KafkaBrokerMessage> list, List<ProduceResponse.PartitionResponse> list2) {
        if (ArrayUtils.isEmpty(jArr)) {
            list2.add(new ProduceResponse.PartitionResponse(i, 0L, s));
        } else {
            list2.add(new ProduceResponse.PartitionResponse(i, jArr[0], s));
        }
    }

    public int type() {
        return KafkaCommandType.PRODUCE.getCode();
    }
}
