package org.springframework.messaging.simp.stomp;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import reactor.core.Environment;
import reactor.core.composable.Composable;
import reactor.core.composable.Deferred;
import reactor.core.composable.spec.DeferredPromiseSpec;
import reactor.function.Consumer;
import reactor.tcp.Reconnect;
import reactor.tcp.TcpClient;
import reactor.tcp.TcpConnection;
import reactor.tcp.encoding.DelimitedCodec;
import reactor.tcp.encoding.StandardCodecs;
import reactor.tcp.netty.NettyTcpClient;
import reactor.tcp.spec.TcpClientSpec;
import reactor.tuple.Tuple;
import reactor.tuple.Tuple2;

/* loaded from: input_file:org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.class */
public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler {
    private final MessageChannel messageChannel;
    private String relayHost;
    private int relayPort;
    private String systemLogin;
    private String systemPasscode;
    private final StompMessageConverter stompMessageConverter;
    private Environment environment;
    private TcpClient<String, String> tcpClient;
    private final Map<String, RelaySession> relaySessions;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler$RelaySession.class */
    public class RelaySession {
        private final String sessionId;
        private final BlockingQueue<Message<?>> messageQueue;
        private volatile StompConnection stompConnection;
        private final Object monitor;

        private RelaySession(String str) {
            this.messageQueue = new LinkedBlockingQueue(50);
            this.stompConnection = new StompConnection();
            this.monitor = new Object();
            Assert.notNull(str, "sessionId is required");
            this.sessionId = str;
        }

        public String getId() {
            return this.sessionId;
        }

        public void connect(final Message<?> message) {
            Assert.notNull(message, "connectMessage is required");
            Composable<TcpConnection<String, String>> openTcpConnection = openTcpConnection();
            openTcpConnection.consume(new Consumer<TcpConnection<String, String>>() { // from class: org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.RelaySession.1
                public void accept(TcpConnection<String, String> tcpConnection) {
                    RelaySession.this.handleTcpConnection(tcpConnection, message);
                }
            });
            openTcpConnection.when(Throwable.class, new Consumer<Throwable>() { // from class: org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.RelaySession.2
                public void accept(Throwable th) {
                    StompBrokerRelayMessageHandler.this.relaySessions.remove(RelaySession.this.sessionId);
                    RelaySession.this.handleTcpClientFailure("Failed to connect to message broker", th);
                }
            });
        }

        protected Composable<TcpConnection<String, String>> openTcpConnection() {
            return StompBrokerRelayMessageHandler.this.tcpClient.open();
        }

        protected void handleTcpConnection(TcpConnection<String, String> tcpConnection, Message<?> message) {
            this.stompConnection.setTcpConnection(tcpConnection);
            tcpConnection.in().consume(new Consumer<String>() { // from class: org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.RelaySession.3
                public void accept(String str) {
                    RelaySession.this.readStompFrame(str);
                }
            });
            forwardInternal(tcpConnection, message);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void readStompFrame(String str) {
            if (StringUtils.isEmpty(str)) {
                return;
            }
            Message<?> message = StompBrokerRelayMessageHandler.this.stompMessageConverter.toMessage(str);
            if (StompBrokerRelayMessageHandler.this.logger.isTraceEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.trace("Reading message " + message);
            }
            StompHeaderAccessor wrap = StompHeaderAccessor.wrap(message);
            if (StompCommand.CONNECTED != wrap.getCommand()) {
                wrap.setSessionId(this.sessionId);
                sendMessageToClient(MessageBuilder.withPayloadAndHeaders(message.getPayload(), wrap).build());
                return;
            }
            synchronized (this.monitor) {
                this.stompConnection.setReady();
                StompBrokerRelayMessageHandler.this.publishBrokerAvailableEvent();
                flushMessages();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void handleTcpClientFailure(String str, Throwable th) {
            if (StompBrokerRelayMessageHandler.this.logger.isErrorEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.error(str + ", sessionId=" + this.sessionId, th);
            }
            this.stompConnection.setDisconnected();
            sendError(str);
            StompBrokerRelayMessageHandler.this.publishBrokerUnavailableEvent();
        }

        private void sendError(String str) {
            StompHeaderAccessor create = StompHeaderAccessor.create(StompCommand.ERROR);
            create.setSessionId(this.sessionId);
            create.setMessage(str);
            sendMessageToClient(MessageBuilder.withPayloadAndHeaders(new byte[0], create).build());
        }

        protected void sendMessageToClient(Message<?> message) {
            StompBrokerRelayMessageHandler.this.messageChannel.send(message);
        }

        public void forward(Message<?> message) {
            if (!this.stompConnection.isReady()) {
                synchronized (this.monitor) {
                    if (!this.stompConnection.isReady()) {
                        this.messageQueue.add(message);
                        if (StompBrokerRelayMessageHandler.this.logger.isTraceEnabled()) {
                            StompBrokerRelayMessageHandler.this.logger.trace("Not connected, message queued. Queue size=" + this.messageQueue.size());
                        }
                        return;
                    }
                }
            }
            if (this.messageQueue.isEmpty()) {
                forwardInternal(message);
            } else {
                this.messageQueue.add(message);
                flushMessages();
            }
        }

        private boolean forwardInternal(Message<?> message) {
            TcpConnection<String, String> readyConnection = this.stompConnection.getReadyConnection();
            if (readyConnection == null) {
                return false;
            }
            return forwardInternal(readyConnection, message);
        }

        private boolean forwardInternal(TcpConnection<String, String> tcpConnection, Message<?> message) {
            if (StompBrokerRelayMessageHandler.this.logger.isTraceEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.trace("Forwarding to STOMP broker, message: " + message);
            }
            String str = new String(StompBrokerRelayMessageHandler.this.stompMessageConverter.fromMessage(message), Charset.forName("UTF-8"));
            final Deferred deferred = (Deferred) new DeferredPromiseSpec().get();
            tcpConnection.send(str, new Consumer<Boolean>() { // from class: org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.RelaySession.4
                public void accept(Boolean bool) {
                    deferred.accept(bool);
                }
            });
            Boolean bool = null;
            try {
                bool = (Boolean) deferred.compose().await();
                if (bool == null) {
                    handleTcpClientFailure("Timed out waiting for message to be forwarded to the broker", null);
                } else if (!bool.booleanValue() && StompHeaderAccessor.wrap(message).getCommand() != StompCommand.DISCONNECT) {
                    handleTcpClientFailure("Failed to forward message to the broker", null);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                handleTcpClientFailure("Interrupted while forwarding message to the broker", e);
            }
            if (bool != null) {
                return bool.booleanValue();
            }
            return false;
        }

        private void flushMessages() {
            ArrayList arrayList = new ArrayList();
            this.messageQueue.drainTo(arrayList);
            Iterator it = arrayList.iterator();
            while (it.hasNext() && forwardInternal((Message) it.next())) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler$StompConnection.class */
    public static class StompConnection {
        private volatile TcpConnection<String, String> connection;
        private AtomicReference<TcpConnection<String, String>> readyConnection;

        private StompConnection() {
            this.readyConnection = new AtomicReference<>();
        }

        public void setTcpConnection(TcpConnection<String, String> tcpConnection) {
            Assert.notNull(tcpConnection, "connection must not be null");
            this.connection = tcpConnection;
        }

        public TcpConnection<String, String> getReadyConnection() {
            return this.readyConnection.get();
        }

        public void setReady() {
            this.readyConnection.set(this.connection);
        }

        public boolean isReady() {
            return this.readyConnection.get() != null;
        }

        public void setDisconnected() {
            this.readyConnection.set(null);
            this.connection = null;
        }

        public String toString() {
            return "StompConnection [ready=" + isReady() + "]";
        }
    }

    /* loaded from: input_file:org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler$SystemRelaySession.class */
    private class SystemRelaySession extends RelaySession {
        public static final String ID = "stompRelaySystemSessionId";

        public SystemRelaySession() {
            super(ID);
        }

        public void connect() {
            StompHeaderAccessor create = StompHeaderAccessor.create(StompCommand.CONNECT);
            create.setAcceptVersion("1.1,1.2");
            create.setLogin(StompBrokerRelayMessageHandler.this.systemLogin);
            create.setPasscode(StompBrokerRelayMessageHandler.this.systemPasscode);
            create.setHeartbeat(0L, 0L);
            super.connect(MessageBuilder.withPayloadAndHeaders(new byte[0], create).build());
        }

        @Override // org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.RelaySession
        protected Composable<TcpConnection<String, String>> openTcpConnection() {
            return StompBrokerRelayMessageHandler.this.tcpClient.open(new Reconnect() { // from class: org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.SystemRelaySession.1
                public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress inetSocketAddress, int i) {
                    return Tuple.of(inetSocketAddress, 5000L);
                }
            });
        }

        @Override // org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.RelaySession
        protected void sendMessageToClient(Message<?> message) {
            if (StompCommand.ERROR.equals(StompHeaderAccessor.wrap(message).getCommand()) && StompBrokerRelayMessageHandler.this.logger.isErrorEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.error("System session received ERROR frame from broker: " + message);
            }
        }
    }

    public StompBrokerRelayMessageHandler(MessageChannel messageChannel, Collection<String> collection) {
        super(collection);
        this.relayHost = "127.0.0.1";
        this.relayPort = 61613;
        this.systemLogin = "guest";
        this.systemPasscode = "guest";
        this.stompMessageConverter = new StompMessageConverter();
        this.relaySessions = new ConcurrentHashMap();
        Assert.notNull(messageChannel, "messageChannel is required");
        this.messageChannel = messageChannel;
    }

    public void setRelayHost(String str) {
        Assert.hasText(str, "relayHost must not be empty");
        this.relayHost = str;
    }

    public String getRelayHost() {
        return this.relayHost;
    }

    public void setRelayPort(int i) {
        this.relayPort = i;
    }

    public int getRelayPort() {
        return this.relayPort;
    }

    public void setSystemLogin(String str) {
        Assert.hasText(str, "systemLogin must not be empty");
        this.systemLogin = str;
    }

    public String getSystemLogin() {
        return this.systemLogin;
    }

    public void setSystemPasscode(String str) {
        this.systemPasscode = str;
    }

    public String getSystemPasscode() {
        return this.systemPasscode;
    }

    @Override // org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler
    protected void startInternal() {
        this.environment = new Environment();
        this.tcpClient = (TcpClient) new TcpClientSpec(NettyTcpClient.class).env(this.environment).codec(new DelimitedCodec((byte) 0, true, StandardCodecs.STRING_CODEC)).connect(this.relayHost, this.relayPort).get();
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Initializing \"system\" TCP connection");
        }
        SystemRelaySession systemRelaySession = new SystemRelaySession();
        this.relaySessions.put(systemRelaySession.getId(), systemRelaySession);
        systemRelaySession.connect();
    }

    @Override // org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler
    protected void stopInternal() {
        try {
            this.tcpClient.close().await();
        } catch (Throwable th) {
            this.logger.error("Failed to close reactor TCP client", th);
        }
        try {
            this.environment.shutdown();
        } catch (Throwable th2) {
            this.logger.error("Failed to shut down reactor Environment", th2);
        }
    }

    @Override // org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler
    protected void handleMessageInternal(Message<?> message) {
        StompHeaderAccessor wrap = StompHeaderAccessor.wrap(message);
        String sessionId = wrap.getSessionId();
        String destination = wrap.getDestination();
        StompCommand command = wrap.getCommand();
        SimpMessageType messageType = wrap.getMessageType();
        if (SimpMessageType.MESSAGE.equals(messageType)) {
            sessionId = sessionId == null ? SystemRelaySession.ID : sessionId;
            wrap.setSessionId(sessionId);
            command = command == null ? StompCommand.SEND : command;
            wrap.setCommandIfNotSet(command);
            message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), wrap).build();
        }
        if (wrap.getCommand() == null) {
            this.logger.error("No STOMP command, ignoring message: " + message);
            return;
        }
        if (sessionId == null) {
            this.logger.error("No sessionId, ignoring message: " + message);
            return;
        }
        if (!command.requiresDestination() || checkDestinationPrefix(destination)) {
            try {
                if (SimpMessageType.CONNECT.equals(messageType)) {
                    wrap.setHeartbeat(0L, 0L);
                    Message<?> build = MessageBuilder.withPayloadAndHeaders(message.getPayload(), wrap).build();
                    RelaySession relaySession = new RelaySession(sessionId);
                    this.relaySessions.put(sessionId, relaySession);
                    relaySession.connect(build);
                } else if (SimpMessageType.DISCONNECT.equals(messageType)) {
                    RelaySession remove = this.relaySessions.remove(sessionId);
                    if (remove == null) {
                        if (this.logger.isTraceEnabled()) {
                            this.logger.trace("Session already removed, sessionId=" + sessionId);
                            return;
                        }
                        return;
                    }
                    remove.forward(message);
                } else {
                    RelaySession relaySession2 = this.relaySessions.get(sessionId);
                    if (relaySession2 == null) {
                        this.logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message);
                        return;
                    }
                    relaySession2.forward(message);
                }
            } catch (Throwable th) {
                this.logger.error("Failed to handle message " + message, th);
            }
        }
    }
}
