/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.service.amqp;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.noop.NoopSpanContext;
import io.opentracing.noop.NoopTracerFactory;
import io.opentracing.propagation.Format;
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.http.CaseInsensitiveHeaders;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.auth.HonoUser;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.service.amqp.AbstractAmqpEndpoint;
import org.eclipse.hono.service.auth.AuthorizationService;
import org.eclipse.hono.service.auth.ClaimsBasedAuthorizationService;
import org.eclipse.hono.tracing.MessageAnnotationsExtractAdapter;
import org.eclipse.hono.tracing.MultiMapInjectAdapter;
import org.eclipse.hono.util.AmqpErrorException;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.EventBusMessage;
import org.eclipse.hono.util.HonoProtonHelper;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class RequestResponseEndpoint<T extends ServiceConfigProperties>
extends AbstractAmqpEndpoint<T> {
    private AuthorizationService authorizationService = new ClaimsBasedAuthorizationService();
    private final Map<String, ProtonReceiver> replyToReceiverMap = new HashMap<String, ProtonReceiver>();
    private final Multimap<ProtonConnection, MessageConsumer<?>> replyConsumerMap = HashMultimap.create();
    private final Multimap<ProtonConnection, String> replyAddressMap = HashMultimap.create();
    private final Set<String> replyAddresses = new HashSet<String>();
    protected Tracer tracer = NoopTracerFactory.create();

    protected RequestResponseEndpoint(Vertx vertx) {
        super(Objects.requireNonNull(vertx));
    }

    public abstract void processRequest(Message var1, ResourceIdentifier var2, HonoUser var3);

    protected abstract Message getAmqpReply(EventBusMessage var1);

    public final AuthorizationService getAuthorizationService() {
        return this.authorizationService;
    }

    @Autowired(required=false)
    public final void setAuthorizationService(AuthorizationService authService) {
        this.authorizationService = authService;
    }

    @Autowired(required=false)
    public final void setTracer(Tracer opentracingTracer) {
        this.logger.info("using OpenTracing Tracer implementation [{}]", (Object)opentracingTracer.getClass().getName());
        this.tracer = Objects.requireNonNull(opentracingTracer);
    }

    @Override
    public final void onLinkAttach(ProtonConnection con, ProtonReceiver receiver, ResourceIdentifier targetAddress) {
        if (ProtonQoS.AT_MOST_ONCE.equals((Object)receiver.getRemoteQoS())) {
            this.logger.debug("client wants to use unsupported AT MOST ONCE delivery mode for endpoint [{}], closing link ...", (Object)this.getName());
            receiver.setCondition(ProtonHelper.condition((String)AmqpError.PRECONDITION_FAILED.toString(), (String)"endpoint requires AT_LEAST_ONCE QoS"));
            receiver.close();
        } else {
            this.logger.debug("establishing link for receiving messages from client [{}]", (Object)receiver.getName());
            receiver.setQoS(ProtonQoS.AT_LEAST_ONCE);
            receiver.setAutoAccept(true);
            receiver.setPrefetch(0);
            receiver.handler((delivery, message) -> this.handleMessage(con, receiver, targetAddress, delivery, message));
            HonoProtonHelper.setCloseHandler((ProtonLink)receiver, clientDetached -> this.onLinkDetach(receiver));
            receiver.open();
            receiver.flow(((ServiceConfigProperties)this.config).getReceiverLinkCredit());
            this.logger.debug("Flowing {} credits to the sender", (Object)((ServiceConfigProperties)this.config).getReceiverLinkCredit());
        }
    }

    protected final void handleMessage(ProtonConnection con, ProtonReceiver receiver, ResourceIdentifier targetAddress, ProtonDelivery delivery, Message message) {
        Future formalCheck = Future.future();
        if (this.passesFormalVerification(targetAddress, message)) {
            formalCheck.complete();
        } else {
            formalCheck.fail((Throwable)new AmqpErrorException(AmqpError.DECODE_ERROR, "malformed payload"));
        }
        HonoUser clientPrincipal = Constants.getClientPrincipal((ProtonConnection)con);
        String replyTo = message.getReplyTo();
        formalCheck.compose(ok -> {
            if (!this.replyAddresses.contains(replyTo)) {
                return Future.failedFuture((Throwable)new AmqpErrorException(AmqpError.ILLEGAL_STATE, "unsubscribed reply-to address"));
            }
            this.allocateReceiverForReplyTo(replyTo, receiver);
            return Future.succeededFuture();
        }).compose(ok -> this.isAuthorized(clientPrincipal, targetAddress, message)).compose(authorized -> {
            this.logger.debug("client [{}] is {}authorized to {}:{}", new Object[]{clientPrincipal.getName(), authorized != false ? "" : "not ", targetAddress, message.getSubject()});
            if (authorized.booleanValue()) {
                try {
                    this.processRequest(message, targetAddress, clientPrincipal);
                    ProtonHelper.accepted((ProtonDelivery)delivery, (boolean)true);
                    return Future.succeededFuture();
                }
                catch (DecodeException e) {
                    return Future.failedFuture((Throwable)new AmqpErrorException(AmqpError.DECODE_ERROR, "malformed payload"));
                }
            }
            return Future.failedFuture((Throwable)new AmqpErrorException(AmqpError.UNAUTHORIZED_ACCESS, "unauthorized"));
        }).otherwise(t -> {
            this.flowCreditToRequestor(replyTo);
            if (t instanceof AmqpErrorException) {
                AmqpErrorException cause = (AmqpErrorException)t;
                MessageHelper.rejected((ProtonDelivery)delivery, (ErrorCondition)cause.asErrorCondition());
            } else {
                this.logger.debug("error processing request [resource: {}, op: {}]: {}", new Object[]{targetAddress, message.getSubject(), t.getMessage()});
                MessageHelper.rejected((ProtonDelivery)delivery, (ErrorCondition)ProtonHelper.condition((Symbol)AmqpError.INTERNAL_ERROR, (String)"internal error"));
            }
            return null;
        });
    }

    protected Future<Boolean> isAuthorized(HonoUser clientPrincipal, ResourceIdentifier resource, Message message) {
        Objects.requireNonNull(message);
        return this.getAuthorizationService().isAuthorized(clientPrincipal, resource, message.getSubject());
    }

    protected Future<EventBusMessage> filterResponse(HonoUser clientPrincipal, EventBusMessage response) {
        return Future.succeededFuture((Object)Objects.requireNonNull(response));
    }

    @Override
    public final void onLinkAttach(ProtonConnection con, ProtonSender sender, ResourceIdentifier replyToAddress) {
        if (!this.isValidReplyToAddress(replyToAddress)) {
            this.logger.debug("client [{}] provided invalid reply-to address", (Object)sender.getName());
            sender.setCondition(ProtonHelper.condition((Symbol)AmqpError.INVALID_FIELD, (String)String.format("reply-to address must have the following format %s/<tenant>/<reply-address>", this.getName())));
            sender.close();
            return;
        }
        String replyTo = replyToAddress.toString();
        if (this.replyAddresses.contains(replyTo)) {
            this.logger.debug("client [{}] wanted to subscribe to already subscribed reply-to address [{}]", (Object)sender.getName(), (Object)replyTo);
            sender.setCondition(ProtonHelper.condition((Symbol)AmqpError.ILLEGAL_STATE, (String)String.format("reply-to address [%s] is already subscribed", replyTo)));
            sender.close();
            return;
        }
        this.logger.debug("establishing response sender link with client [{}]", (Object)sender.getName());
        MessageConsumer replyConsumer = this.vertx.eventBus().consumer(replyTo, message -> {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("forwarding reply to client [{}]: {}", (Object)sender.getName(), (Object)((JsonObject)message.body()).encodePrettily());
            }
            EventBusMessage response = EventBusMessage.fromJson((JsonObject)((JsonObject)message.body()));
            this.filterResponse(Constants.getClientPrincipal((ProtonConnection)con), response).recover(t -> {
                int status = ServiceInvocationException.extractStatusCode((Throwable)t);
                return Future.succeededFuture((Object)response.getResponse(status));
            }).map(filteredResponse -> {
                try {
                    Message amqpReply = this.getAmqpReply((EventBusMessage)filteredResponse);
                    sender.send(amqpReply);
                }
                finally {
                    this.flowCreditToRequestor(replyTo);
                }
                return null;
            });
        });
        this.registerConsumerForConnection(con, replyTo, replyConsumer);
        sender.setQoS(ProtonQoS.AT_LEAST_ONCE);
        HonoProtonHelper.setCloseHandler((ProtonLink)sender, senderClosed -> {
            this.logger.debug("client [{}] closed sender link, removing associated event bus consumer [{}]", (Object)sender.getName(), (Object)replyConsumer.address());
            this.deallocateReceiverForReplyTo(replyTo);
            this.unregisterConsumerForConnection(con, replyTo, replyConsumer);
            sender.close();
        });
        sender.open();
    }

    @Override
    public void onConnectionClosed(ProtonConnection connection) {
        Objects.requireNonNull(connection);
        this.unregisterAllConsumersForConnection(connection);
        this.deallocateAllReceiversForConnection(connection);
    }

    private void allocateReceiverForReplyTo(String replyTo, ProtonReceiver receiver) {
        ProtonReceiver oldReceiver = this.replyToReceiverMap.put(replyTo, receiver);
        if (oldReceiver == null || oldReceiver == receiver) {
            this.logger.debug("Allocated receiver [{}] for replies to [{}]", (Object)receiver, (Object)replyTo);
        } else {
            this.logger.info("Allocated receiver [{}] for replies to [{}] - Had existing receiver: [{}]", new Object[]{receiver, replyTo, oldReceiver});
        }
    }

    private void flowCreditToRequestor(String replyTo) {
        ProtonReceiver receiver = this.replyToReceiverMap.get(replyTo);
        if (receiver == null) {
            this.logger.warn("No receiver found for reply-to address [{}]", (Object)replyTo);
            return;
        }
        receiver.flow(1);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Flowing credit back to sender - replyTo: [{}], currentCredits: {}", (Object)replyTo, (Object)receiver.getCredit());
        }
    }

    private void deallocateReceiverForReplyTo(String replyTo) {
        ProtonReceiver receiver = this.replyToReceiverMap.remove(replyTo);
        if (receiver == null) {
            this.logger.warn("Receiver was not allocated to replyTo address [{}]", (Object)replyTo);
        } else {
            this.logger.debug("Deallocated receiver [{}] for replies to [{}]", (Object)receiver, (Object)replyTo);
        }
    }

    private void deallocateAllReceiversForConnection(ProtonConnection connection) {
        this.replyToReceiverMap.entrySet().removeIf(entry -> ((ProtonReceiver)entry.getValue()).getSession().getConnection() == connection);
    }

    private void registerConsumerForConnection(ProtonConnection connection, String replyTo, MessageConsumer<?> replyConsumer) {
        this.replyConsumerMap.put((Object)connection, replyConsumer);
        this.replyAddressMap.put((Object)connection, (Object)replyTo);
        this.replyAddresses.add(replyTo);
    }

    private void unregisterConsumerForConnection(ProtonConnection connection, String replyTo, MessageConsumer<?> replyConsumer) {
        replyConsumer.unregister();
        this.replyConsumerMap.remove((Object)connection, replyConsumer);
        this.replyAddressMap.remove((Object)connection, (Object)replyTo);
        this.replyAddresses.remove(replyTo);
    }

    private void unregisterAllConsumersForConnection(ProtonConnection connection) {
        this.replyConsumerMap.removeAll((Object)connection).forEach(MessageConsumer::unregister);
        this.replyAddresses.removeAll(this.replyAddressMap.removeAll((Object)connection));
    }

    protected boolean isValidReplyToAddress(ResourceIdentifier replyToAddress) {
        if (replyToAddress == null) {
            return false;
        }
        return replyToAddress.getResourcePath().length >= 3;
    }

    protected final DeliveryOptions createEventBusMessageDeliveryOptions(SpanContext spanContext) {
        DeliveryOptions deliveryOptions = new DeliveryOptions();
        if (spanContext != null && !(spanContext instanceof NoopSpanContext)) {
            CaseInsensitiveHeaders multiMap = new CaseInsensitiveHeaders();
            this.tracer.inject(spanContext, Format.Builtin.TEXT_MAP, (Object)new MultiMapInjectAdapter((MultiMap)multiMap));
            deliveryOptions.setHeaders((MultiMap)multiMap);
        }
        return deliveryOptions;
    }

    protected final SpanContext extractSpanContext(Message message) {
        return this.tracer.extract(Format.Builtin.TEXT_MAP, (Object)new MessageAnnotationsExtractAdapter(message));
    }
}

