/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.service.amqp;

import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetServerOptions;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.ProtonServer;
import io.vertx.proton.ProtonServerOptions;
import io.vertx.proton.ProtonSession;
import io.vertx.proton.sasl.ProtonSaslAuthenticatorFactory;
import java.lang.ref.WeakReference;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.Source;
import org.eclipse.hono.auth.Activity;
import org.eclipse.hono.auth.HonoUser;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.service.AbstractServiceBase;
import org.eclipse.hono.service.amqp.AmqpEndpoint;
import org.eclipse.hono.service.auth.AuthorizationService;
import org.eclipse.hono.service.auth.ClaimsBasedAuthorizationService;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.ResourceIdentifier;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

public abstract class AmqpServiceBase<T extends ServiceConfigProperties>
extends AbstractServiceBase<T> {
    private final Map<String, AmqpEndpoint> endpoints = new HashMap<String, AmqpEndpoint>();
    private ProtonServer server;
    private ProtonServer insecureServer;
    private ProtonSaslAuthenticatorFactory saslAuthenticatorFactory;
    private AuthorizationService authorizationService;

    protected abstract String getServiceName();

    @Autowired
    @Qualifier(value="amqp")
    public void setConfig(T configuration) {
        this.setSpecificConfig(configuration);
    }

    @Override
    public int getPortDefaultValue() {
        return 5671;
    }

    @Override
    public int getInsecurePortDefaultValue() {
        return 5672;
    }

    @Autowired(required=false)
    public final void addEndpoints(List<AmqpEndpoint> definedEndpoints) {
        Objects.requireNonNull(definedEndpoints);
        for (AmqpEndpoint ep : definedEndpoints) {
            this.addEndpoint(ep);
        }
    }

    public final void addEndpoint(AmqpEndpoint ep) {
        if (this.endpoints.putIfAbsent(ep.getName(), ep) != null) {
            this.LOG.warn("multiple endpoints defined with name [{}]", (Object)ep.getName());
        } else {
            this.LOG.debug("registering endpoint [{}]", (Object)ep.getName());
        }
    }

    protected final AmqpEndpoint getEndpoint(ResourceIdentifier targetAddress) {
        return this.getEndpoint(targetAddress.getEndpoint());
    }

    protected final AmqpEndpoint getEndpoint(String endpointName) {
        return this.endpoints.get(endpointName);
    }

    protected final Iterable<AmqpEndpoint> endpoints() {
        return this.endpoints.values();
    }

    @Autowired(required=false)
    public void setSaslAuthenticatorFactory(ProtonSaslAuthenticatorFactory factory) {
        this.saslAuthenticatorFactory = Objects.requireNonNull(factory);
    }

    public final void setAuthorizationService(AuthorizationService authService) {
        this.authorizationService = authService;
    }

    protected final AuthorizationService getAuthorizationService() {
        return this.authorizationService;
    }

    @Override
    public Future<Void> startInternal() {
        if (this.authorizationService == null) {
            this.authorizationService = new ClaimsBasedAuthorizationService();
        }
        return this.preStartServers().compose(s -> this.checkPortConfiguration()).compose(s -> this.startEndpoints()).compose(s -> this.startSecureServer()).compose(s -> this.startInsecureServer());
    }

    protected final void closeExpiredConnection(ProtonConnection con) {
        HonoUser clientPrincipal;
        if (!con.isDisconnected() && (clientPrincipal = Constants.getClientPrincipal((ProtonConnection)con)) != null) {
            this.LOG.debug("client's [{}] access token has expired, closing connection", (Object)clientPrincipal.getName());
            con.disconnectHandler(null);
            con.closeHandler(null);
            con.setCondition(ProtonHelper.condition((Symbol)AmqpError.UNAUTHORIZED_ACCESS, (String)"access token expired"));
            con.close();
            con.disconnect();
            this.publishConnectionClosedEvent(con);
        }
    }

    protected Future<Void> preStartServers() {
        return Future.succeededFuture();
    }

    private Future<Void> startEndpoints() {
        ArrayList<Future<Void>> endpointFutures = new ArrayList<Future<Void>>(this.endpoints.size());
        for (AmqpEndpoint ep : this.endpoints.values()) {
            this.LOG.info("starting endpoint [name: {}, class: {}]", (Object)ep.getName(), (Object)ep.getClass().getName());
            endpointFutures.add(ep.start());
        }
        Future startFuture = Future.future();
        CompositeFuture.all(endpointFutures).setHandler(startup -> {
            if (startup.succeeded()) {
                startFuture.complete();
            } else {
                startFuture.fail(startup.cause());
            }
        });
        return startFuture;
    }

    private Future<Void> stopEndpoints() {
        ArrayList<Future<Void>> endpointFutures = new ArrayList<Future<Void>>(this.endpoints.size());
        for (AmqpEndpoint ep : this.endpoints.values()) {
            this.LOG.info("stopping endpoint [name: {}, class: {}]", (Object)ep.getName(), (Object)ep.getClass().getName());
            endpointFutures.add(ep.stop());
        }
        Future stopFuture = Future.future();
        CompositeFuture.all(endpointFutures).setHandler(shutdown -> {
            if (shutdown.succeeded()) {
                stopFuture.complete();
            } else {
                stopFuture.fail(shutdown.cause());
            }
        });
        return stopFuture;
    }

    private Future<Void> startInsecureServer() {
        if (this.isInsecurePortEnabled()) {
            int insecurePort = this.determineInsecurePort();
            Future result = Future.future();
            ProtonServerOptions options = this.createInsecureServerOptions();
            this.insecureServer = this.createProtonServer(options).connectHandler(this::onRemoteConnectionOpenInsecurePort).listen(insecurePort, ((ServiceConfigProperties)this.getConfig()).getInsecurePortBindAddress(), bindAttempt -> {
                if (bindAttempt.succeeded()) {
                    if (this.getInsecurePort() == this.getInsecurePortDefaultValue()) {
                        this.LOG.info("server listens on standard insecure port [{}:{}]", (Object)this.getInsecurePortBindAddress(), (Object)this.getInsecurePort());
                    } else {
                        this.LOG.warn("server listens on non-standard insecure port [{}:{}], default is {}", new Object[]{this.getInsecurePortBindAddress(), this.getInsecurePort(), this.getInsecurePortDefaultValue()});
                    }
                    result.complete();
                } else {
                    this.LOG.error("cannot bind to insecure port", bindAttempt.cause());
                    result.fail(bindAttempt.cause());
                }
            });
            return result;
        }
        this.LOG.info("insecure port is not enabled");
        return Future.succeededFuture();
    }

    private Future<Void> startSecureServer() {
        if (this.isSecurePortEnabled()) {
            int securePort = this.determineSecurePort();
            Future result = Future.future();
            ProtonServerOptions options = this.createServerOptions();
            this.server = this.createProtonServer(options).connectHandler(this::onRemoteConnectionOpen).listen(securePort, ((ServiceConfigProperties)this.getConfig()).getBindAddress(), bindAttempt -> {
                if (bindAttempt.succeeded()) {
                    if (this.getPort() == this.getPortDefaultValue()) {
                        this.LOG.info("server listens on standard secure port [{}:{}]", (Object)this.getBindAddress(), (Object)this.getPort());
                    } else {
                        this.LOG.warn("server listens on non-standard secure port [{}:{}], default is {}", new Object[]{this.getBindAddress(), this.getPort(), this.getPortDefaultValue()});
                    }
                    result.complete();
                } else {
                    this.LOG.error("cannot bind to secure port", bindAttempt.cause());
                    result.fail(bindAttempt.cause());
                }
            });
            return result;
        }
        this.LOG.info("secure port is not enabled");
        return Future.succeededFuture();
    }

    private ProtonServer createProtonServer(ProtonServerOptions options) {
        return ProtonServer.create((Vertx)this.vertx, (ProtonServerOptions)options).saslAuthenticatorFactory(this.saslAuthenticatorFactory);
    }

    protected ProtonServerOptions createServerOptions() {
        ProtonServerOptions options = this.createInsecureServerOptions();
        this.addTlsKeyCertOptions((NetServerOptions)options);
        this.addTlsTrustOptions((NetServerOptions)options);
        return options;
    }

    protected ProtonServerOptions createInsecureServerOptions() {
        ProtonServerOptions options = new ProtonServerOptions();
        options.setHeartbeat(60000);
        options.setReceiveBufferSize(16384);
        options.setSendBufferSize(16384);
        options.setLogActivity(((ServiceConfigProperties)this.getConfig()).isNetworkDebugLoggingEnabled());
        return options;
    }

    @Override
    public final Future<Void> stopInternal() {
        return CompositeFuture.all(this.stopServer(), this.stopInsecureServer()).compose(s -> this.stopEndpoints());
    }

    private Future<Void> stopServer() {
        Future secureTracker = Future.future();
        if (this.server != null) {
            this.LOG.info("stopping secure AMQP server [{}:{}]", (Object)this.getBindAddress(), (Object)this.getActualPort());
            this.server.close(secureTracker.completer());
        } else {
            secureTracker.complete();
        }
        return secureTracker;
    }

    private Future<Void> stopInsecureServer() {
        Future insecureTracker = Future.future();
        if (this.insecureServer != null) {
            this.LOG.info("stopping insecure AMQP server [{}:{}]", (Object)this.getInsecurePortBindAddress(), (Object)this.getActualInsecurePort());
            this.insecureServer.close(insecureTracker.completer());
        } else {
            insecureTracker.complete();
        }
        return insecureTracker;
    }

    @Override
    protected final int getActualPort() {
        return this.server != null ? this.server.actualPort() : -1;
    }

    @Override
    protected final int getActualInsecurePort() {
        return this.insecureServer != null ? this.insecureServer.actualPort() : -1;
    }

    protected void onRemoteConnectionOpen(ProtonConnection connection) {
        connection.setContainer(String.format("%s-%s:%d", this.getServiceName(), this.getBindAddress(), this.getPort()));
        this.setRemoteConnectionOpenHandler(connection);
    }

    protected void onRemoteConnectionOpenInsecurePort(ProtonConnection connection) {
        connection.setContainer(String.format("%s-%s:%d", this.getServiceName(), this.getInsecurePortBindAddress(), this.getInsecurePort()));
        this.setRemoteConnectionOpenHandler(connection);
    }

    protected final void handleUnknownEndpoint(ProtonConnection con, ProtonLink<?> link, ResourceIdentifier address) {
        this.LOG.info("client [container: {}] wants to establish link for unknown endpoint [address: {}]", (Object)con.getRemoteContainer(), (Object)address);
        link.setCondition(ProtonHelper.condition((Symbol)AmqpError.NOT_FOUND, (String)String.format("no endpoint registered for address %s", address)));
        link.close();
    }

    protected final ResourceIdentifier getResourceIdentifier(String address) {
        if (((ServiceConfigProperties)this.getConfig()).isSingleTenant()) {
            return ResourceIdentifier.fromStringAssumingDefaultTenant((String)address);
        }
        return ResourceIdentifier.fromString((String)address);
    }

    protected void handleReceiverOpen(ProtonConnection con, ProtonReceiver receiver) {
        if (receiver.getRemoteTarget().getAddress() == null) {
            this.LOG.debug("client [container: {}] wants to open an anonymous link for sending messages to arbitrary addresses, closing link ...", (Object)con.getRemoteContainer());
            receiver.setCondition(ProtonHelper.condition((Symbol)AmqpError.NOT_ALLOWED, (String)"anonymous relay not supported"));
            receiver.close();
        } else {
            this.LOG.debug("client [container: {}] wants to open a link [address: {}] for sending messages", (Object)con.getRemoteContainer(), (Object)receiver.getRemoteTarget());
            try {
                ResourceIdentifier targetResource = this.getResourceIdentifier(receiver.getRemoteTarget().getAddress());
                AmqpEndpoint endpoint = this.getEndpoint(targetResource);
                if (endpoint == null) {
                    this.handleUnknownEndpoint(con, (ProtonLink<?>)receiver, targetResource);
                } else {
                    HonoUser user = Constants.getClientPrincipal((ProtonConnection)con);
                    this.getAuthorizationService().isAuthorized(user, targetResource, Activity.WRITE).setHandler(authAttempt -> {
                        if (authAttempt.succeeded() && ((Boolean)authAttempt.result()).booleanValue()) {
                            Constants.copyProperties((ProtonConnection)con, (ProtonLink)receiver);
                            receiver.setTarget(receiver.getRemoteTarget());
                            endpoint.onLinkAttach(con, receiver, targetResource);
                        } else {
                            this.LOG.debug("subject [{}] is not authorized to WRITE to [{}]", (Object)user.getName(), (Object)targetResource);
                            receiver.setCondition(ProtonHelper.condition((String)AmqpError.UNAUTHORIZED_ACCESS.toString(), (String)"unauthorized"));
                            receiver.close();
                        }
                    });
                }
            }
            catch (IllegalArgumentException e) {
                this.LOG.debug("client has provided invalid resource identifier as target address", (Throwable)e);
                receiver.setCondition(ProtonHelper.condition((Symbol)AmqpError.NOT_FOUND, (String)"no such address"));
                receiver.close();
            }
        }
    }

    protected void handleSenderOpen(ProtonConnection con, ProtonSender sender) {
        Source remoteSource = sender.getRemoteSource();
        this.LOG.debug("client [container: {}] wants to open a link [address: {}] for receiving messages", (Object)con.getRemoteContainer(), (Object)remoteSource);
        try {
            ResourceIdentifier targetResource = this.getResourceIdentifier(remoteSource.getAddress());
            AmqpEndpoint endpoint = this.getEndpoint(targetResource);
            if (endpoint == null) {
                this.handleUnknownEndpoint(con, (ProtonLink<?>)sender, targetResource);
            } else {
                HonoUser user = Constants.getClientPrincipal((ProtonConnection)con);
                this.getAuthorizationService().isAuthorized(user, targetResource, Activity.READ).setHandler(authAttempt -> {
                    if (authAttempt.succeeded() && ((Boolean)authAttempt.result()).booleanValue()) {
                        Constants.copyProperties((ProtonConnection)con, (ProtonLink)sender);
                        sender.setSource(sender.getRemoteSource());
                        endpoint.onLinkAttach(con, sender, targetResource);
                    } else {
                        this.LOG.debug("subject [{}] is not authorized to READ from [{}]", (Object)user.getName(), (Object)targetResource);
                        sender.setCondition(ProtonHelper.condition((String)AmqpError.UNAUTHORIZED_ACCESS.toString(), (String)"unauthorized"));
                        sender.close();
                    }
                });
            }
        }
        catch (IllegalArgumentException e) {
            this.LOG.debug("client has provided invalid resource identifier as target address", (Throwable)e);
            sender.setCondition(ProtonHelper.condition((Symbol)AmqpError.NOT_FOUND, (String)"no such address"));
            sender.close();
        }
    }

    private void setRemoteConnectionOpenHandler(ProtonConnection connection) {
        connection.sessionOpenHandler(remoteOpenSession -> this.handleSessionOpen(connection, (ProtonSession)remoteOpenSession));
        connection.receiverOpenHandler(remoteOpenReceiver -> this.handleReceiverOpen(connection, (ProtonReceiver)remoteOpenReceiver));
        connection.senderOpenHandler(remoteOpenSender -> this.handleSenderOpen(connection, (ProtonSender)remoteOpenSender));
        connection.disconnectHandler(this::handleRemoteDisconnect);
        connection.closeHandler(remoteClose -> this.handleRemoteConnectionClose(connection, (AsyncResult<ProtonConnection>)remoteClose));
        connection.openHandler(remoteOpen -> {
            HonoUser clientPrincipal = Constants.getClientPrincipal((ProtonConnection)connection);
            this.LOG.debug("client [container: {}, user: {}] connected", (Object)connection.getRemoteContainer(), (Object)clientPrincipal.getName());
            connection.attachments().set((Object)"CONNECTION_ID", String.class, (Object)UUID.randomUUID().toString());
            Duration delay = Duration.between(Instant.now(), clientPrincipal.getExpirationTime());
            WeakReference<ProtonConnection> conRef = new WeakReference<ProtonConnection>(connection);
            this.vertx.setTimer(delay.toMillis(), timerId -> {
                if (conRef.get() != null) {
                    this.closeExpiredConnection((ProtonConnection)conRef.get());
                }
            });
            connection.open();
        });
    }

    protected void handleSessionOpen(ProtonConnection con, ProtonSession session) {
        this.LOG.debug("opening new session with client [container: {}]", (Object)con.getRemoteContainer());
        session.closeHandler(sessionResult -> {
            if (sessionResult.succeeded()) {
                ((ProtonSession)sessionResult.result()).close();
            }
        }).open();
    }

    protected void publishConnectionClosedEvent(ProtonConnection con) {
    }

    protected void handleRemoteConnectionClose(ProtonConnection con, AsyncResult<ProtonConnection> res) {
        if (res.succeeded()) {
            this.LOG.debug("client [container: {}] closed connection", (Object)con.getRemoteContainer());
        } else {
            this.LOG.debug("client [container: {}] closed connection with error", (Object)con.getRemoteContainer(), (Object)res.cause());
        }
        con.close();
        con.disconnect();
        this.publishConnectionClosedEvent(con);
    }

    protected void handleRemoteDisconnect(ProtonConnection con) {
        this.LOG.debug("client [container: {}] disconnected", (Object)con.getRemoteContainer());
        con.disconnect();
        this.publishConnectionClosedEvent(con);
    }

    @Override
    public void registerReadinessChecks(HealthCheckHandler handler) {
        for (AmqpEndpoint ep : this.endpoints()) {
            ep.registerReadinessChecks(handler);
        }
    }

    @Override
    public void registerLivenessChecks(HealthCheckHandler handler) {
        for (AmqpEndpoint ep : this.endpoints()) {
            ep.registerLivenessChecks(handler);
        }
    }
}

