/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.service.amqp;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import java.util.Objects;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.eclipse.hono.auth.HonoUser;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.service.amqp.AbstractAmqpEndpoint;
import org.eclipse.hono.service.auth.AuthorizationService;
import org.eclipse.hono.service.auth.ClaimsBasedAuthorizationService;
import org.eclipse.hono.util.AmqpErrorException;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class RequestResponseEndpoint<T extends ServiceConfigProperties>
extends AbstractAmqpEndpoint<T> {
    private static final int REQUEST_RESPONSE_ENDPOINT_DEFAULT_CREDITS = 20;
    private int receiverLinkCredit = 20;
    private AuthorizationService authorizationService = new ClaimsBasedAuthorizationService();

    protected RequestResponseEndpoint(Vertx vertx) {
        super(Objects.requireNonNull(vertx));
    }

    public abstract void processRequest(org.apache.qpid.proton.message.Message var1, ResourceIdentifier var2, HonoUser var3);

    protected abstract org.apache.qpid.proton.message.Message getAmqpReply(Message<JsonObject> var1);

    public final int getReceiverLinkCredit() {
        return this.receiverLinkCredit;
    }

    public final void setReceiverLinkCredit(int receiverLinkCredit) {
        if (receiverLinkCredit <= 0) {
            throw new IllegalArgumentException("receiver link credit must be at least 1");
        }
        this.receiverLinkCredit = receiverLinkCredit;
    }

    public final AuthorizationService getAuthorizationService() {
        return this.authorizationService;
    }

    @Autowired(required=false)
    public final void setAuthorizationService(AuthorizationService authService) {
        this.authorizationService = authService;
    }

    @Override
    public final void onLinkAttach(ProtonConnection con, ProtonReceiver receiver, ResourceIdentifier targetAddress) {
        if (ProtonQoS.AT_MOST_ONCE.equals((Object)receiver.getRemoteQoS())) {
            this.logger.debug("client wants to use unsupported AT MOST ONCE delivery mode for endpoint [{}], closing link ...", (Object)this.getName());
            receiver.setCondition(ProtonHelper.condition((String)AmqpError.PRECONDITION_FAILED.toString(), (String)"endpoint requires AT_LEAST_ONCE QoS"));
            receiver.close();
        } else {
            this.logger.debug("establishing link for receiving messages from client [{}]", (Object)receiver.getName());
            ((ProtonReceiver)((ProtonReceiver)receiver.setQoS(ProtonQoS.AT_LEAST_ONCE)).setAutoAccept(true).setPrefetch(this.receiverLinkCredit).handler((delivery, message) -> this.handleMessage(con, receiver, targetAddress, delivery, message)).closeHandler(clientDetached -> this.onLinkDetach(receiver))).open();
        }
    }

    protected final void handleMessage(ProtonConnection con, ProtonReceiver receiver, ResourceIdentifier targetAddress, ProtonDelivery delivery, org.apache.qpid.proton.message.Message message) {
        Future requestTracker = Future.future();
        requestTracker.setHandler(s -> {
            if (s.succeeded()) {
                ProtonHelper.accepted((ProtonDelivery)delivery, (boolean)true);
            } else if (s.cause() instanceof AmqpErrorException) {
                AmqpErrorException cause = (AmqpErrorException)s.cause();
                MessageHelper.rejected((ProtonDelivery)delivery, (ErrorCondition)cause.asErrorCondition());
            } else {
                this.logger.debug("error processing request [resource: {}, op: {}]: {}", new Object[]{targetAddress, message.getSubject(), s.cause().getMessage()});
                MessageHelper.rejected((ProtonDelivery)delivery, (ErrorCondition)ProtonHelper.condition((Symbol)AmqpError.INTERNAL_ERROR, (String)"internal error"));
            }
        });
        if (this.passesFormalVerification(targetAddress, message)) {
            HonoUser clientPrincipal = Constants.getClientPrincipal((ProtonConnection)con);
            this.isAuthorized(clientPrincipal, targetAddress, message.getSubject()).compose(authorized -> {
                this.logger.debug("client [{}] is {}authorized to {}:{}", new Object[]{clientPrincipal.getName(), authorized != false ? "" : "not ", targetAddress, message.getSubject()});
                if (authorized.booleanValue()) {
                    try {
                        this.processRequest(message, targetAddress, Constants.getClientPrincipal((ProtonConnection)con));
                        requestTracker.complete();
                    }
                    catch (DecodeException e) {
                        requestTracker.fail((Throwable)new AmqpErrorException(AmqpError.DECODE_ERROR, "malformed payload"));
                    }
                } else {
                    requestTracker.fail((Throwable)new AmqpErrorException(AmqpError.UNAUTHORIZED_ACCESS, "unauthorized"));
                }
            }, requestTracker);
        } else {
            requestTracker.fail((Throwable)new AmqpErrorException(AmqpError.DECODE_ERROR, "malformed payload"));
        }
    }

    protected Future<Boolean> isAuthorized(HonoUser clientPrincipal, ResourceIdentifier resource, String operation) {
        return this.getAuthorizationService().isAuthorized(clientPrincipal, resource, operation);
    }

    @Override
    public final void onLinkAttach(ProtonConnection con, ProtonSender sender, ResourceIdentifier replyToAddress) {
        if (replyToAddress.getResourceId() == null) {
            this.logger.debug("client [{}] provided invalid reply-to address", (Object)sender.getName());
            sender.setCondition(ProtonHelper.condition((Symbol)AmqpError.INVALID_FIELD, (String)String.format("reply-to address must have the following format %s/<tenant>/<reply-address>", this.getName())));
            sender.close();
        } else {
            this.logger.debug("establishing sender link with client [{}]", (Object)sender.getName());
            MessageConsumer replyConsumer = this.vertx.eventBus().consumer(replyToAddress.toString(), message -> {
                this.logger.trace("forwarding reply to client [{}]: {}", (Object)sender.getName(), message.body());
                org.apache.qpid.proton.message.Message amqpReply = this.getAmqpReply((Message<JsonObject>)message);
                sender.send(amqpReply);
            });
            sender.setQoS(ProtonQoS.AT_LEAST_ONCE);
            sender.closeHandler(senderClosed -> {
                this.logger.debug("client [{}] closed sender link, removing associated event bus consumer [{}]", (Object)sender.getName(), (Object)replyConsumer.address());
                replyConsumer.unregister();
                if (senderClosed.succeeded()) {
                    ((ProtonSender)senderClosed.result()).close();
                }
            });
            sender.open();
        }
    }
}

