/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.service.amqp;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.auth.HonoUser;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.service.amqp.AbstractAmqpEndpoint;
import org.eclipse.hono.service.auth.AuthorizationService;
import org.eclipse.hono.service.auth.ClaimsBasedAuthorizationService;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.HonoProtonHelper;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.RequestResponseApiConstants;
import org.eclipse.hono.util.ResourceIdentifier;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class AbstractRequestResponseEndpoint<T extends ServiceConfigProperties>
extends AbstractAmqpEndpoint<T> {
    private final Map<String, ProtonSender> replyToSenderMap = new HashMap<String, ProtonSender>();
    private AuthorizationService authorizationService = new ClaimsBasedAuthorizationService();

    protected AbstractRequestResponseEndpoint(Vertx vertx) {
        super(Objects.requireNonNull(vertx));
    }

    protected abstract boolean passesFormalVerification(ResourceIdentifier var1, Message var2);

    protected abstract Future<Message> handleRequestMessage(Message var1, ResourceIdentifier var2, SpanContext var3);

    public final AuthorizationService getAuthorizationService() {
        return this.authorizationService;
    }

    @Autowired(required=false)
    public final void setAuthorizationService(AuthorizationService authService) {
        this.authorizationService = authService;
    }

    @Override
    public final void onLinkAttach(ProtonConnection con, ProtonReceiver receiver, ResourceIdentifier targetAddress) {
        if (ProtonQoS.AT_MOST_ONCE.equals((Object)receiver.getRemoteQoS())) {
            this.logger.debug("client wants to use unsupported AT MOST ONCE delivery mode for endpoint [{}], closing link ...", (Object)this.getName());
            receiver.setCondition(ProtonHelper.condition((String)AmqpError.PRECONDITION_FAILED.toString(), (String)"endpoint requires AT_LEAST_ONCE QoS"));
            receiver.close();
        } else {
            this.logger.debug("establishing link for receiving request messages from client [{}]", (Object)receiver.getName());
            receiver.setQoS(ProtonQoS.AT_LEAST_ONCE);
            receiver.setAutoAccept(true);
            receiver.setTarget(receiver.getRemoteTarget());
            receiver.setSource(receiver.getRemoteSource());
            receiver.setPrefetch(0);
            receiver.handler((delivery, message) -> {
                HonoProtonHelper.onReceivedMessageDeliveryUpdatedFromRemote((ProtonDelivery)delivery, d -> this.logger.debug("got unexpected disposition update for received message [remote state: {}]", (Object)delivery.getRemoteState()));
                try {
                    this.handleRequestMessage(con, receiver, targetAddress, delivery, message);
                }
                catch (Exception ex) {
                    this.logger.warn("error handling message", (Throwable)ex);
                    ProtonHelper.released((ProtonDelivery)delivery, (boolean)true);
                }
            });
            HonoProtonHelper.setCloseHandler((ProtonLink)receiver, remoteClose -> this.onLinkDetach(receiver));
            HonoProtonHelper.setDetachHandler((ProtonLink)receiver, remoteDetach -> this.onLinkDetach(receiver));
            receiver.open();
            this.logger.debug("flowing {} credits to client", (Object)((ServiceConfigProperties)this.config).getReceiverLinkCredit());
            receiver.flow(((ServiceConfigProperties)this.config).getReceiverLinkCredit());
        }
    }

    protected final void handleRequestMessage(ProtonConnection con, ProtonReceiver receiver, ResourceIdentifier targetAddress, ProtonDelivery delivery, Message requestMessage) {
        HonoUser clientPrincipal = Constants.getClientPrincipal((ProtonConnection)con);
        String replyTo = requestMessage.getReplyTo();
        SpanContext spanContext = TracingHelper.extractSpanContext((Tracer)this.tracer, (Message)requestMessage);
        Span currentSpan = TracingHelper.buildServerChildSpan((Tracer)this.tracer, (SpanContext)spanContext, (String)"process request message", (String)this.getName()).withTag(Tags.HTTP_METHOD.getKey(), requestMessage.getSubject()).withTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), targetAddress.toString()).start();
        if (!this.passesFormalVerification(targetAddress, requestMessage)) {
            MessageHelper.rejected((ProtonDelivery)delivery, (ErrorCondition)new ErrorCondition(Constants.AMQP_BAD_REQUEST, "malformed request message"));
            this.flowCreditToRequestor(receiver, replyTo);
            TracingHelper.logError((Span)currentSpan, (String)"malformed request message");
            currentSpan.finish();
            return;
        }
        ProtonHelper.accepted((ProtonDelivery)delivery, (boolean)true);
        currentSpan.log("request message accepted");
        this.getSenderForConnection(con, replyTo).compose(sender -> this.isAuthorized(clientPrincipal, targetAddress, requestMessage).map(authorized -> {
            this.logger.debug("client [{}] is {}authorized to {}:{}", new Object[]{clientPrincipal.getName(), authorized != false ? "" : "not ", targetAddress, requestMessage.getSubject()});
            if (authorized.booleanValue()) {
                return authorized;
            }
            throw new ClientErrorException(403, "not authorized to invoke operation");
        }).compose(authorized -> this.handleRequestMessage(requestMessage, targetAddress, currentSpan.context())).compose(amqpMessage -> this.filterResponse(clientPrincipal, requestMessage, (Message)amqpMessage)).otherwise(t -> {
            this.logger.debug("error processing request [resource: {}, op: {}]: {}", new Object[]{targetAddress, requestMessage.getSubject(), t.getMessage()});
            currentSpan.log("error processing request");
            TracingHelper.logError((Span)currentSpan, (Throwable)t);
            ServiceInvocationException ex = this.getServiceInvocationException((Throwable)t);
            Tags.HTTP_STATUS.set(currentSpan, Integer.valueOf(ex.getErrorCode()));
            return RequestResponseApiConstants.getErrorMessage((int)ex.getErrorCode(), (String)ex.getMessage(), (Message)requestMessage);
        }).map(amqpMessage -> {
            Tags.HTTP_STATUS.set(currentSpan, MessageHelper.getStatus((Message)amqpMessage));
            if (HonoProtonHelper.isLinkOpenAndConnected((ProtonLink)sender)) {
                ProtonDelivery responseDelivery = sender.send(amqpMessage);
                this.logger.debug("sent response message to client  [correlation-id: {}, content-type: {}]", amqpMessage.getCorrelationId(), (Object)amqpMessage.getContentType());
                currentSpan.log("sent response message to client");
                return responseDelivery;
            }
            TracingHelper.logError((Span)currentSpan, (String)"cannot send response, reply-to link is closed");
            return null;
        })).onComplete(s -> {
            this.flowCreditToRequestor(receiver, replyTo);
            currentSpan.finish();
        });
    }

    protected Future<Message> filterResponse(HonoUser clientPrincipal, Message request, Message response) {
        return Future.succeededFuture((Object)Objects.requireNonNull(response));
    }

    protected Future<Boolean> isAuthorized(HonoUser clientPrincipal, ResourceIdentifier resource, Message message) {
        Objects.requireNonNull(message);
        return this.getAuthorizationService().isAuthorized(clientPrincipal, resource, message.getSubject());
    }

    @Override
    public final void onLinkAttach(ProtonConnection con, ProtonSender sender, ResourceIdentifier replyToAddress) {
        if (!this.isValidReplyToAddress(replyToAddress)) {
            this.logger.debug("client [{}] provided invalid reply-to address", (Object)sender.getName());
            sender.setCondition(ProtonHelper.condition((Symbol)AmqpError.INVALID_FIELD, (String)String.format("reply-to address must have the following format %s/<tenant>/<reply-address>", this.getName())));
            sender.close();
            return;
        }
        String replyTo = replyToAddress.toString();
        if (this.replyToSenderMap.containsKey(replyTo)) {
            this.logger.debug("client [{}] wanted to subscribe to already subscribed reply-to address [{}]", (Object)sender.getName(), (Object)replyTo);
            sender.setCondition(ProtonHelper.condition((Symbol)AmqpError.ILLEGAL_STATE, (String)String.format("reply-to address [%s] is already subscribed", replyTo)));
            sender.close();
            return;
        }
        this.logger.debug("establishing response sender link with client [{}]", (Object)sender.getName());
        sender.setQoS(ProtonQoS.AT_LEAST_ONCE);
        sender.setSource(sender.getRemoteSource());
        sender.setTarget(sender.getRemoteTarget());
        this.registerSenderForReplyTo(replyTo, sender);
        HonoProtonHelper.setCloseHandler((ProtonLink)sender, remoteClose -> {
            this.logger.debug("client [{}] closed sender link", (Object)sender.getName());
            this.unregisterSenderForReplyTo(replyTo);
            sender.close();
        });
        HonoProtonHelper.setDetachHandler((ProtonLink)sender, remoteDetach -> {
            this.logger.debug("client [{}] detached sender link", (Object)sender.getName());
            this.unregisterSenderForReplyTo(replyTo);
            sender.close();
        });
        sender.open();
    }

    @Override
    public void onConnectionClosed(ProtonConnection connection) {
        Objects.requireNonNull(connection);
        this.deallocateAllSendersForConnection(connection);
    }

    private Future<ProtonSender> getSenderForConnection(ProtonConnection con, String replytoAddress) {
        Promise result = Promise.promise();
        ProtonSender sender = this.replyToSenderMap.get(replytoAddress);
        if (sender != null && sender.isOpen() && sender.getSession().getConnection() == con) {
            result.complete((Object)sender);
        } else {
            result.fail((Throwable)new ClientErrorException(412, "must open receiver link for reply-to address first"));
        }
        return result.future();
    }

    private void registerSenderForReplyTo(String replyTo, ProtonSender sender) {
        ProtonSender oldSender = this.replyToSenderMap.put(replyTo, sender);
        if (oldSender == null || oldSender == sender) {
            this.logger.debug("registered sender [{}] for replies to [{}]", (Object)sender, (Object)replyTo);
        } else {
            this.logger.info("replaced existing sender [{}] for replies to [{}] with sender [{}]", new Object[]{oldSender, replyTo, sender});
        }
    }

    private void unregisterSenderForReplyTo(String replyTo) {
        ProtonSender sender = this.replyToSenderMap.remove(replyTo);
        if (sender == null) {
            this.logger.warn("sender was not allocated for replyTo address [{}]", (Object)replyTo);
        } else {
            this.logger.debug("deallocated sender [{}] for replies to [{}]", (Object)sender.getName(), (Object)replyTo);
        }
    }

    private void deallocateAllSendersForConnection(ProtonConnection connection) {
        this.replyToSenderMap.entrySet().removeIf(entry -> ((ProtonSender)entry.getValue()).getSession().getConnection() == connection);
    }

    private void flowCreditToRequestor(ProtonReceiver receiver, String replyTo) {
        receiver.flow(1);
        this.logger.trace("replenished client [reply-to: {}, current credit: {}]", (Object)replyTo, (Object)receiver.getCredit());
    }

    protected boolean isValidReplyToAddress(ResourceIdentifier replyToAddress) {
        if (replyToAddress == null) {
            return false;
        }
        return replyToAddress.getResourcePath().length >= 3;
    }

    protected static final <T> T getTypesafeValueForField(Class<T> clazz, JsonObject payload, String field) {
        Objects.requireNonNull(clazz);
        Objects.requireNonNull(payload);
        Objects.requireNonNull(field);
        Object result = payload.getValue(field);
        if (clazz.isInstance(result)) {
            return clazz.cast(result);
        }
        return null;
    }

    protected static final <T> T removeTypesafeValueForField(Class<T> clazz, JsonObject payload, String field) {
        Objects.requireNonNull(clazz);
        Objects.requireNonNull(payload);
        Objects.requireNonNull(field);
        Object result = payload.remove(field);
        if (clazz.isInstance(result)) {
            return clazz.cast(result);
        }
        return null;
    }

    protected Future<Message> finishSpanOnFutureCompletion(Span span, Future<Message> resultFuture) {
        return resultFuture.compose(message -> {
            Integer status = MessageHelper.getStatus((Message)message);
            Tags.HTTP_STATUS.set(span, MessageHelper.getStatus((Message)message));
            if (status != null && (status < 100 || status >= 600)) {
                Tags.ERROR.set(span, Boolean.valueOf(true));
            }
            span.finish();
            return Future.succeededFuture((Object)message);
        }).recover(t -> {
            Tags.HTTP_STATUS.set(span, Integer.valueOf(ServiceInvocationException.extractStatusCode((Throwable)t)));
            TracingHelper.logError((Span)span, (Throwable)t);
            span.finish();
            return Future.failedFuture((Throwable)t);
        });
    }

    private ServiceInvocationException getServiceInvocationException(Throwable error) {
        if (error instanceof ServiceInvocationException) {
            return (ServiceInvocationException)error;
        }
        if (error instanceof ReplyException) {
            ReplyException ex = (ReplyException)error;
            switch (ex.failureType()) {
                case TIMEOUT: {
                    return new ServerErrorException(503, "request could not be processed at the moment");
                }
            }
            return new ServerErrorException(500);
        }
        return new ServerErrorException(500);
    }
}

