package com.alibaba.cloud.stream.binder.rocketmq.integration.outbound;

import com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst;
import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport;
import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;

/* loaded from: input_file:com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.class */
public class RocketMQProducerMessageHandler extends AbstractMessageHandler implements Lifecycle {
    private static final Logger log = LoggerFactory.getLogger(RocketMQProducerMessageHandler.class);
    private volatile boolean running = false;
    private volatile boolean isTrans = false;
    private ErrorMessageStrategy errorMessageStrategy;
    private MessageChannel sendFailureChannel;
    private MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor;
    private DefaultMQProducer defaultMQProducer;
    private MessageQueueSelector messageQueueSelector;
    private final ProducerDestination destination;
    private final ExtendedProducerProperties<RocketMQProducerProperties> extendedProducerProperties;
    private final RocketMQProducerProperties mqProducerProperties;

    public RocketMQProducerMessageHandler(ProducerDestination producerDestination, ExtendedProducerProperties<RocketMQProducerProperties> extendedProducerProperties, RocketMQProducerProperties rocketMQProducerProperties) {
        this.destination = producerDestination;
        this.extendedProducerProperties = extendedProducerProperties;
        this.mqProducerProperties = rocketMQProducerProperties;
    }

    protected void onInit() {
        if (null == this.mqProducerProperties || !this.mqProducerProperties.getEnabled()) {
            return;
        }
        super.onInit();
        this.defaultMQProducer = RocketMQProduceFactory.initRocketMQProducer(this.destination.getName(), this.mqProducerProperties);
        this.isTrans = this.defaultMQProducer instanceof TransactionMQProducer;
        this.messageQueueSelector = (MessageQueueSelector) RocketMQBeanContainerCache.getBean(this.mqProducerProperties.getMessageQueueSelector(), MessageQueueSelector.class, this.extendedProducerProperties.isPartitioned() ? new PartitionMessageQueueSelector() : null);
    }

    public void start() {
        Instrumentation instrumentation = new Instrumentation(this.destination.getName(), this);
        try {
            this.defaultMQProducer.start();
            if (!this.isTrans && this.extendedProducerProperties.isPartitioned()) {
                List fetchPublishMessageQueues = this.defaultMQProducer.fetchPublishMessageQueues(this.destination.getName());
                if (this.extendedProducerProperties.getPartitionCount() != fetchPublishMessageQueues.size()) {
                    this.logger.info(String.format("The partition count of topic '%s' will change from '%s' to '%s'", this.destination.getName(), Integer.valueOf(this.extendedProducerProperties.getPartitionCount()), Integer.valueOf(fetchPublishMessageQueues.size())));
                    this.extendedProducerProperties.setPartitionCount(fetchPublishMessageQueues.size());
                    this.partitioningInterceptor.setPartitionCount(this.extendedProducerProperties.getPartitionCount());
                }
            }
            this.running = true;
            instrumentation.markStartedSuccessfully();
        } catch (MQClientException | NullPointerException e) {
            instrumentation.markStartFailed(e);
            log.error("The defaultMQProducer startup failure !!!", e);
        } finally {
            InstrumentationManager.addHealthInstrumentation(instrumentation);
        }
    }

    public void stop() {
        if (this.running && null != this.defaultMQProducer) {
            this.defaultMQProducer.shutdown();
        }
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    protected void handleMessageInternal(Message<?> message) {
        SendResult send;
        try {
            org.apache.rocketmq.common.message.Message convertMessage2MQ = RocketMQMessageConverterSupport.convertMessage2MQ(this.destination.getName(), message);
            if (this.defaultMQProducer instanceof TransactionMQProducer) {
                TransactionListener transactionListener = (TransactionListener) RocketMQBeanContainerCache.getBean(this.mqProducerProperties.getTransactionListener(), TransactionListener.class);
                if (transactionListener == null) {
                    throw new MessagingException("TransactionMQProducer must have a TransactionListener !!! ");
                }
                this.defaultMQProducer.setTransactionListener(transactionListener);
                if (log.isDebugEnabled()) {
                    log.debug("send transaction message ->{}", convertMessage2MQ);
                }
                send = this.defaultMQProducer.sendMessageInTransaction(convertMessage2MQ, message.getHeaders().get(RocketMQConst.USER_TRANSACTIONAL_ARGS));
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("send message ->{}", convertMessage2MQ);
                }
                send = send(convertMessage2MQ, this.messageQueueSelector, message.getHeaders(), message);
            }
            if (log.isDebugEnabled()) {
                log.debug("the message has sent,message={},sendResult={}", convertMessage2MQ, send);
            }
            if (send == null || !SendStatus.SEND_OK.equals(send.getSendStatus())) {
                log.error("message send fail.SendStatus is not OK.the message={}", convertMessage2MQ);
                doFail(message, new MessagingException("message send fail.SendStatus is not OK."));
            }
        } catch (Exception e) {
            log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage(), e);
            doFail(message, e);
        }
    }

    private SendResult send(org.apache.rocketmq.common.message.Message message, MessageQueueSelector messageQueueSelector, Object obj, Message<?> message2) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        SendResult sendResult = new SendResult();
        sendResult.setSendStatus(SendStatus.SEND_OK);
        if (RocketMQProducerProperties.SendType.OneWay.equalsName(this.mqProducerProperties.getSendType())) {
            if (null != messageQueueSelector) {
                this.defaultMQProducer.sendOneway(message, messageQueueSelector, obj);
            } else {
                this.defaultMQProducer.sendOneway(message);
            }
            return sendResult;
        }
        if (RocketMQProducerProperties.SendType.Sync.equalsName(this.mqProducerProperties.getSendType())) {
            return null != messageQueueSelector ? this.defaultMQProducer.send(message, messageQueueSelector, obj) : this.defaultMQProducer.send(message);
        }
        if (!RocketMQProducerProperties.SendType.Async.equalsName(this.mqProducerProperties.getSendType())) {
            throw new MessagingException("message hasn't been sent,cause by : the SendType must be in this values[OneWay, Async, Sync]");
        }
        if (null != messageQueueSelector) {
            this.defaultMQProducer.send(message, messageQueueSelector, obj, getSendCallback(message2));
        } else {
            this.defaultMQProducer.send(message, getSendCallback(message2));
        }
        return sendResult;
    }

    private SendCallback getSendCallback(final Message<?> message) {
        SendCallback sendCallback = (SendCallback) RocketMQBeanContainerCache.getBean(this.mqProducerProperties.getSendCallBack(), SendCallback.class);
        if (null == sendCallback) {
            sendCallback = new SendCallback() { // from class: com.alibaba.cloud.stream.binder.rocketmq.integration.outbound.RocketMQProducerMessageHandler.1
                public void onSuccess(SendResult sendResult) {
                }

                public void onException(Throwable th) {
                    RocketMQProducerMessageHandler.this.doFail(message, th);
                }
            };
        }
        return sendCallback;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doFail(Message<?> message, Throwable th) {
        if (getSendFailureChannel() == null) {
            throw new MessagingException(message, th);
        }
        getSendFailureChannel().send(getErrorMessageStrategy().buildErrorMessage(th, ErrorMessageUtils.getAttributeAccessor(message, message)));
    }

    public MessageChannel getSendFailureChannel() {
        return this.sendFailureChannel;
    }

    public void setSendFailureChannel(MessageChannel messageChannel) {
        this.sendFailureChannel = messageChannel;
    }

    public ErrorMessageStrategy getErrorMessageStrategy() {
        return this.errorMessageStrategy;
    }

    public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
        this.errorMessageStrategy = errorMessageStrategy;
    }

    public MessageConverterConfigurer.PartitioningInterceptor getPartitioningInterceptor() {
        return this.partitioningInterceptor;
    }

    public RocketMQProducerMessageHandler setPartitioningInterceptor(MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor) {
        this.partitioningInterceptor = partitioningInterceptor;
        return this;
    }
}
