/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.protocol.amqp.proton.transaction;

import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProtonTransactionHandler
implements ProtonDeliveryHandler {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final int amqpCredit;
    private final int amqpLowMark;
    private Transaction currentTx;
    final AMQPSessionCallback sessionSPI;
    final AMQPConnectionContext connection;
    private final ByteBuffer DECODE_BUFFER = ByteBuffer.allocate(64);

    public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) {
        this.sessionSPI = sessionSPI;
        this.connection = connection;
        this.amqpCredit = connection.getAmqpCredits();
        this.amqpLowMark = connection.getAmqpLowCredits();
        this.sessionSPI.setTransactionHandler(this);
    }

    @Override
    public void onMessage(final Delivery delivery) throws ActiveMQAMQPException {
        try {
            Receiver receiver = (Receiver)delivery.getLink();
            if (!delivery.isReadable()) {
                return;
            }
            if (receiver.getCredit() < this.amqpLowMark) {
                receiver.flow(this.amqpCredit);
            }
            ByteBuffer buffer = delivery.available() > this.DECODE_BUFFER.capacity() ? ByteBuffer.allocate(delivery.available()) : this.DECODE_BUFFER.clear();
            buffer.limit(receiver.recv(buffer.array(), buffer.arrayOffset(), buffer.capacity()));
            receiver.advance();
            MessageImpl msg = this.decodeMessage(buffer);
            Object action = ((AmqpValue)msg.getBody()).getValue();
            if (action instanceof Declare) {
                Binary txID = this.sessionSPI.newTransaction();
                final Declared declared = new Declared();
                declared.setTxnId(txID);
                this.currentTx = this.sessionSPI.getTransaction(txID, false);
                IOCallback ioAction = new IOCallback(){

                    public void done() {
                        ProtonTransactionHandler.this.connection.runLater(() -> {
                            delivery.settle();
                            delivery.disposition((DeliveryState)declared);
                            ProtonTransactionHandler.this.connection.flush();
                        });
                    }

                    public void onError(int errorCode, String errorMessage) {
                        ProtonTransactionHandler.this.currentTx = null;
                    }
                };
                this.sessionSPI.afterIO(ioAction);
            } else if (action instanceof Discharge) {
                Discharge discharge = (Discharge)action;
                Binary txID = discharge.getTxnId();
                ProtonTransactionImpl tx = (ProtonTransactionImpl)this.sessionSPI.getTransaction(txID, true);
                tx.discharge();
                IOCallback ioAction = new IOCallback(){

                    public void done() {
                        ProtonTransactionHandler.this.connection.runLater(() -> {
                            delivery.settle();
                            delivery.disposition((DeliveryState)new Accepted());
                            ProtonTransactionHandler.this.currentTx = null;
                            ProtonTransactionHandler.this.connection.flush();
                        });
                    }

                    public void onError(int errorCode, String errorMessage) {
                    }
                };
                if (discharge.getFail().booleanValue()) {
                    this.sessionSPI.withinSessionExecutor(() -> {
                        try {
                            tx.rollback();
                            this.sessionSPI.afterIO(ioAction);
                        }
                        catch (Throwable e) {
                            this.txError(delivery, e);
                        }
                    });
                } else {
                    this.sessionSPI.withinSessionExecutor(() -> {
                        try {
                            tx.commit();
                            this.sessionSPI.afterIO(ioAction);
                        }
                        catch (Throwable e) {
                            this.txError(delivery, e);
                        }
                    });
                }
            }
        }
        catch (ActiveMQAMQPException amqpE) {
            this.txError(delivery, (Throwable)((Object)amqpE));
        }
        catch (Throwable e) {
            this.txError(delivery, e);
        }
    }

    private void txError(Delivery delivery, Throwable e) {
        logger.warn(e.getMessage(), e);
        this.connection.runNow(() -> {
            delivery.settle();
            if (e instanceof ActiveMQAMQPException) {
                ActiveMQAMQPException activeMQAMQPException = (ActiveMQAMQPException)((Object)((Object)e));
                delivery.disposition((DeliveryState)this.createRejected(activeMQAMQPException.getAmqpError(), e.getMessage()));
            } else {
                delivery.disposition((DeliveryState)this.createRejected(Symbol.getSymbol((String)"failed"), e.getMessage()));
            }
            this.connection.flush();
        });
    }

    @Override
    public void onFlow(int credits, boolean drain) {
    }

    @Override
    public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
    }

    @Override
    public void close(ErrorCondition condition) throws ActiveMQAMQPException {
    }

    private Rejected createRejected(Symbol amqpError, String message) {
        Rejected rejected = new Rejected();
        ErrorCondition condition = new ErrorCondition();
        condition.setCondition(amqpError);
        condition.setDescription(message);
        rejected.setError(condition);
        return rejected;
    }

    private MessageImpl decodeMessage(ByteBuffer encoded) {
        MessageImpl message = (MessageImpl)Message.Factory.create();
        message.decode(encoded);
        return message;
    }

    public Transaction getCurrentTransaction() {
        return this.currentTx;
    }
}

