/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.jms.trident;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.io.Serializable;
import java.util.List;
import org.apache.storm.jms.JmsMessageProducer;
import org.apache.storm.jms.JmsProvider;
import org.apache.storm.topology.FailedException;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.ITuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsState
implements State {
    private static final Logger LOG = LoggerFactory.getLogger(JmsState.class);
    private Options options;
    private Connection connection;
    private Session session;
    private MessageProducer messageProducer;

    protected JmsState(Options options) {
        this.options = options;
    }

    protected void prepare() {
        if (this.options.jmsProvider == null || this.options.msgProducer == null) {
            throw new IllegalStateException("JMS Provider and MessageProducer not set.");
        }
        LOG.debug("Connecting JMS..");
        try {
            ConnectionFactory cf = this.options.jmsProvider.connectionFactory();
            Destination dest = this.options.jmsProvider.destination();
            this.connection = cf.createConnection();
            this.session = this.connection.createSession(this.options.jmsTransactional, this.options.jmsAcknowledgeMode);
            this.messageProducer = this.session.createProducer(dest);
            this.connection.start();
        }
        catch (Exception e) {
            LOG.warn("Error creating JMS connection.", (Throwable)e);
        }
    }

    public void beginCommit(Long someLong) {
    }

    public void commit(Long someLong) {
        LOG.debug("Committing JMS transaction.");
        if (this.options.jmsTransactional) {
            try {
                this.session.commit();
            }
            catch (JMSException e) {
                LOG.error("JMS Session commit failed.", (Throwable)e);
            }
        }
    }

    public void updateState(List<TridentTuple> tuples, TridentCollector collector) throws JMSException {
        try {
            for (TridentTuple tuple : tuples) {
                Message msg = this.options.msgProducer.toMessage(this.session, (ITuple)tuple);
                if (msg == null) continue;
                if (msg.getJMSDestination() != null) {
                    this.messageProducer.send(msg.getJMSDestination(), msg);
                    continue;
                }
                this.messageProducer.send(msg);
            }
        }
        catch (JMSException e) {
            LOG.warn("Failed to send jmd message for a trident batch ", (Throwable)e);
            if (this.options.jmsTransactional) {
                this.session.rollback();
            }
            throw new FailedException("Failed to write tuples", (Throwable)e);
        }
    }

    public static class Options
    implements Serializable {
        private JmsProvider jmsProvider;
        private JmsMessageProducer msgProducer;
        private int jmsAcknowledgeMode = 1;
        private boolean jmsTransactional = true;

        public Options withJmsProvider(JmsProvider provider) {
            this.jmsProvider = provider;
            return this;
        }

        public Options withMessageProducer(JmsMessageProducer msgProducer) {
            this.msgProducer = msgProducer;
            return this;
        }

        public Options withJmsAcknowledgeMode(int jmsAcknowledgeMode) {
            this.jmsAcknowledgeMode = jmsAcknowledgeMode;
            return this;
        }

        public Options withJmsTransactional(boolean jmsTransactional) {
            this.jmsTransactional = jmsTransactional;
            return this;
        }
    }
}

