package org.springframework.cloud.stream.binder.rabbit;

import ch.qos.logback.core.spi.AbstractComponentTracker;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.Lifecycle;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.rabbit.stream.producer.RabbitStreamOperations;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;

/* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-binder-rabbit-3.2.1.jar:org/springframework/cloud/stream/binder/rabbit/RabbitStreamMessageHandler.class */
public class RabbitStreamMessageHandler extends AbstractMessageHandler implements Lifecycle {
    private static final int DEFAULT_CONFIRM_TIMEOUT = 10000;
    private final RabbitStreamOperations streamOperations;
    private boolean sync;
    private long confirmTimeout = AbstractComponentTracker.LINGERING_TIMEOUT;
    private SuccessCallback<Message<?>> successCallback = message -> {
    };
    private FailureCallback failureCallback = (message, th) -> {
    };
    private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.outboundMapper();
    private boolean headersMappedLast;

    /* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-binder-rabbit-3.2.1.jar:org/springframework/cloud/stream/binder/rabbit/RabbitStreamMessageHandler$FailureCallback.class */
    public interface FailureCallback {
        void failure(Message<?> message, Throwable th);
    }

    public RabbitStreamMessageHandler(RabbitStreamOperations rabbitStreamOperations) {
        Assert.notNull(rabbitStreamOperations, "'streamOperations' cannot be null");
        this.streamOperations = rabbitStreamOperations;
    }

    public void setSuccessCallback(SuccessCallback<Message<?>> successCallback) {
        Assert.notNull(successCallback, "'successCallback' cannot be null");
        this.successCallback = successCallback;
    }

    public void setFailureCallback(FailureCallback failureCallback) {
        Assert.notNull(failureCallback, "'failureCallback' cannot be null");
        this.failureCallback = failureCallback;
    }

    public void setSync(boolean z) {
        this.sync = z;
    }

    public void setConfirmTimeout(long j) {
        this.confirmTimeout = j;
    }

    public void setHeaderMapper(AmqpHeaderMapper amqpHeaderMapper) {
        Assert.notNull(amqpHeaderMapper, "headerMapper must not be null");
        this.headerMapper = amqpHeaderMapper;
    }

    public void setHeadersMappedLast(boolean z) {
        this.headersMappedLast = z;
    }

    public RabbitStreamOperations getStreamOperations() {
        return this.streamOperations;
    }

    @Override // org.springframework.integration.handler.AbstractMessageHandler
    protected void handleMessageInternal(Message<?> message) {
        com.rabbitmq.stream.Message fromMessage;
        if (message.getPayload() instanceof com.rabbitmq.stream.Message) {
            fromMessage = (com.rabbitmq.stream.Message) message.getPayload();
        } else {
            fromMessage = this.streamOperations.streamMessageConverter().fromMessage(mapMessage(message, this.streamOperations.messageConverter(), this.headerMapper, this.headersMappedLast));
        }
        handleConfirms(message, this.streamOperations.send(fromMessage));
    }

    private void handleConfirms(Message<?> message, ListenableFuture<Boolean> listenableFuture) {
        listenableFuture.addCallback(bool -> {
            this.successCallback.onSuccess(message);
        }, th -> {
            this.failureCallback.failure(message, th);
        });
        if (this.sync) {
            try {
                listenableFuture.get(this.confirmTimeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new MessageHandlingException(message, e);
            } catch (ExecutionException | TimeoutException e2) {
                throw new MessageHandlingException(message, e2);
            }
        }
    }

    private static org.springframework.amqp.core.Message mapMessage(Message<?> message, MessageConverter messageConverter, AmqpHeaderMapper amqpHeaderMapper, boolean z) {
        String contentTypeAsString;
        StreamMessageProperties streamMessageProperties = new StreamMessageProperties();
        if (!z) {
            mapHeaders(message.getHeaders(), streamMessageProperties, amqpHeaderMapper);
        }
        if ((messageConverter instanceof ContentTypeDelegatingMessageConverter) && z && (contentTypeAsString = contentTypeAsString(message.getHeaders())) != null) {
            streamMessageProperties.setContentType(contentTypeAsString);
        }
        org.springframework.amqp.core.Message message2 = messageConverter.toMessage(message.getPayload(), streamMessageProperties);
        if (z) {
            mapHeaders(message.getHeaders(), streamMessageProperties, amqpHeaderMapper);
        }
        return message2;
    }

    private static void mapHeaders(MessageHeaders messageHeaders, MessageProperties messageProperties, AmqpHeaderMapper amqpHeaderMapper) {
        amqpHeaderMapper.fromHeadersToRequest(messageHeaders, messageProperties);
    }

    private static String contentTypeAsString(MessageHeaders messageHeaders) {
        Object obj = messageHeaders.get("contentType");
        if (obj instanceof MimeType) {
            obj = obj.toString();
        }
        if (obj instanceof String) {
            return (String) obj;
        }
        if (obj != null) {
            throw new IllegalArgumentException("contentType header must be a MimeType or String, found: " + obj.getClass().getName());
        }
        return null;
    }

    @Override // org.springframework.context.Lifecycle
    public void start() {
    }

    @Override // org.springframework.context.Lifecycle
    public void stop() {
        this.streamOperations.close();
    }

    @Override // org.springframework.context.Lifecycle
    public boolean isRunning() {
        return true;
    }
}
