/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.protocol.amqp.connect.bridge;

import java.lang.invoke.MethodHandles;
import java.util.UUID;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeManagementSupport;
import org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeManager;
import org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeMetrics;
import org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgePolicy;
import org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeSenderConfiguration;
import org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeSenderInfo;
import org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeToPolicyManager;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotImplementedException;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPLargeMessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPMessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AMQPBridgeToSenderController
implements SenderController {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    protected final AMQPBridgeToPolicyManager policyManager;
    protected final AMQPBridgeSenderConfiguration configuration;
    protected final AMQPBridgePolicy policy;
    protected final AMQPBridgeSenderInfo senderInfo;
    protected final AMQPSessionContext session;
    protected final AMQPSessionCallback sessionSPI;
    protected final AMQPBridgeManager bridgeManager;
    protected final AMQPBridgeMetrics.SenderMetrics metrics;
    protected final String controllerId = UUID.randomUUID().toString();
    protected MessageWriter standardMessageWriter;
    protected MessageWriter largeMessageWriter;
    protected MessageWriter coreMessageWriter;
    protected MessageWriter coreLargeMessageWriter;
    protected ProtonServerSenderContext senderContext;
    protected ServerConsumer serverConsumer;
    protected boolean tunnelCoreMessages;

    public AMQPBridgeToSenderController(AMQPBridgeSenderInfo senderInfo, AMQPBridgeSenderConfiguration configuration, AMQPBridgeToPolicyManager policyManager, AMQPSessionContext session, AMQPBridgeMetrics.SenderMetrics metrics) throws ActiveMQAMQPException {
        this.senderInfo = senderInfo;
        this.policyManager = policyManager;
        this.configuration = configuration;
        this.policy = policyManager.getPolicy();
        this.bridgeManager = policyManager.getBridgeManager();
        this.metrics = metrics;
        this.session = session;
        this.sessionSPI = session.getSessionSPI();
    }

    public abstract SenderRole getRole();

    public final long getMessagesSent() {
        return this.metrics.getMessagesSent();
    }

    public final ActiveMQServer getServer() {
        return this.session.getServer();
    }

    public final AMQPBridgeSenderInfo getSenderInfo() {
        return this.senderInfo;
    }

    public final ProtonServerSenderContext getSenderContext() {
        return this.senderContext;
    }

    public final ServerConsumer getServerConsumer() {
        return this.serverConsumer;
    }

    public final AMQPSessionContext getSessionContext() {
        return this.session;
    }

    public final AMQPSessionCallback getSessionCallback() {
        return this.sessionSPI;
    }

    public final AMQPBridgeManager getBridgeManager() {
        return this.bridgeManager;
    }

    public final AMQPBridgeToPolicyManager getPolicyManager() {
        return this.policyManager;
    }

    public final ServerConsumer init(ProtonServerSenderContext senderContext) throws Exception {
        Sender sender = senderContext.getSender();
        Source source = (Source)sender.getRemoteSource();
        if (this.bridgeManager == null) {
            throw new ActiveMQAMQPIllegalStateException("Cannot create a bridge link from non-bridge connection");
        }
        if (source == null) {
            throw new ActiveMQAMQPNotImplementedException("Null source lookup not supported on bridge links.");
        }
        this.senderContext = senderContext;
        this.tunnelCoreMessages = AmqpSupport.verifyOfferedCapabilities((Link)sender, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT) && AmqpSupport.verifyCapabilities(sender.getDesiredCapabilities(), AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
        this.serverConsumer = this.createServerConsumer(senderContext);
        this.registerSenderManagement();
        return this.serverConsumer;
    }

    protected abstract ServerConsumer createServerConsumer(ProtonServerSenderContext var1) throws Exception;

    @Override
    public final void close(boolean remoteClose) throws Exception {
        try {
            if (this.bridgeManager != null) {
                this.bridgeManager.removeLinkClosedInterceptor(this.controllerId);
            }
        }
        finally {
            this.unregisterSenderManagement();
            if (remoteClose) {
                this.handleLinkRemotelyClosed();
            } else {
                this.handleLinkLocallyClosed(null);
            }
        }
    }

    protected void handleLinkRemotelyClosed() {
    }

    @Override
    public final void close(ErrorCondition error) {
        try {
            if (this.bridgeManager != null) {
                this.bridgeManager.removeLinkClosedInterceptor(this.controllerId);
            }
        }
        finally {
            this.unregisterSenderManagement();
            this.handleLinkLocallyClosed(error);
        }
    }

    protected void handleLinkLocallyClosed(ErrorCondition error) {
    }

    private void registerSenderManagement() {
        try {
            AMQPBridgeManagementSupport.registerBridgeSender(this);
        }
        catch (Exception e) {
            logger.trace("Ignored exception while adding sender to management: ", (Throwable)e);
        }
    }

    private void unregisterSenderManagement() {
        try {
            AMQPBridgeManagementSupport.unregisterBridgeSender(this);
        }
        catch (Exception e) {
            logger.trace("Ignored exception while removing sender from management: ", (Throwable)e);
        }
    }

    @Override
    public final MessageWriter selectOutgoingMessageWriter(ProtonServerSenderContext sender, MessageReference reference) {
        Message message = reference.getMessage();
        MessageWriter selected = message instanceof AMQPMessage ? (message.isLargeMessage() ? (this.largeMessageWriter != null ? this.largeMessageWriter : (this.largeMessageWriter = new CountedMessageWrites(new AMQPLargeMessageWriter(sender), this.metrics))) : (this.standardMessageWriter != null ? this.standardMessageWriter : (this.standardMessageWriter = new CountedMessageWrites(new AMQPMessageWriter(sender), this.metrics)))) : (this.tunnelCoreMessages ? (message.isLargeMessage() ? (this.coreLargeMessageWriter != null ? this.coreLargeMessageWriter : (this.coreLargeMessageWriter = new CountedMessageWrites(new AMQPTunneledCoreLargeMessageWriter(sender), this.metrics))) : (this.coreMessageWriter != null ? this.coreMessageWriter : (this.coreMessageWriter = new CountedMessageWrites(new AMQPTunneledCoreMessageWriter(sender), this.metrics)))) : (this.standardMessageWriter != null ? this.standardMessageWriter : (this.standardMessageWriter = new CountedMessageWrites(new AMQPMessageWriter(sender), this.metrics))));
        return selected;
    }

    protected static RoutingType getRoutingType(Source source) {
        if (source != null && source.getCapabilities() != null) {
            for (Symbol capability : source.getCapabilities()) {
                if (AmqpSupport.TOPIC_CAPABILITY.equals(capability)) {
                    return RoutingType.MULTICAST;
                }
                if (!AmqpSupport.QUEUE_CAPABILITY.equals(capability)) continue;
                return RoutingType.ANYCAST;
            }
        }
        return ActiveMQDefaultConfiguration.getDefaultRoutingType();
    }

    private static class CountedMessageWrites
    implements MessageWriter {
        private final MessageWriter wrapped;
        private final AMQPBridgeMetrics.SenderMetrics metrics;

        CountedMessageWrites(MessageWriter wrapped, AMQPBridgeMetrics.SenderMetrics metrics) {
            this.wrapped = wrapped;
            this.metrics = metrics;
        }

        @Override
        public void close() {
            this.wrapped.close();
        }

        @Override
        public MessageWriter open(MessageReference reference) {
            this.wrapped.open(reference);
            return this;
        }

        @Override
        public boolean isWriting() {
            return this.wrapped.isWriting();
        }

        @Override
        public void writeBytes(MessageReference messageReference) {
            try {
                this.wrapped.writeBytes(messageReference);
            }
            finally {
                this.metrics.incrementMessagesSent();
            }
        }
    }

    public static enum SenderRole {
        ADDRESS_SENDER,
        QUEUE_SENDER;

    }
}

