package io.openk9.ingestion.rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import io.openk9.osgi.util.AutoCloseables;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.SenderOptions;

@Component(immediate = true, service = {RabbitMQActivator.class})
/* loaded from: input_file:io/openk9/ingestion/rabbitmq/RabbitMQActivator.class */
public class RabbitMQActivator {
    private final Collection<AutoCloseables.AutoCloseableSafe> _autoCloseables = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/openk9/ingestion/rabbitmq/RabbitMQActivator$Config.class */
    public @interface Config {
        String uri() default "amqp://projectq:projectq@localhost:5672";
    }

    @Activate
    public void activate(BundleContext bundleContext, Config config) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(config.uri());
        _registerService(bundleContext, Supplier.class, () -> {
            return connectionFactory;
        }, Collections.singletonMap("rabbit", "connection-factory"));
        _registerService(bundleContext, connectionFactory, connectionFactory2 -> {
            return new SenderOptions().connectionFactory(connectionFactory2).resourceManagementScheduler(Schedulers.boundedElastic());
        }, RabbitFlux::createSender, Collections.singletonMap("rabbit", "sender"));
        _registerService(bundleContext, connectionFactory, connectionFactory3 -> {
            return new ReceiverOptions().connectionFactory(connectionFactory3);
        }, RabbitFlux::createReceiver, Collections.singletonMap("rabbit", "receiver"));
    }

    @Modified
    public void modified(BundleContext bundleContext, Config config) throws Exception {
        deactivate();
        activate(bundleContext, config);
    }

    @Deactivate
    public void deactivate() {
        Iterator<AutoCloseables.AutoCloseableSafe> it = this._autoCloseables.iterator();
        while (it.hasNext()) {
            it.next().close();
            it.remove();
        }
    }

    private <OPTIONS, SERVICE extends AutoCloseable> void _registerService(BundleContext bundleContext, ConnectionFactory connectionFactory, Function<ConnectionFactory, OPTIONS> function, Function<OPTIONS, SERVICE> function2, Map<String, Object> map) {
        SERVICE apply = function2.apply(function.apply(connectionFactory));
        ServiceRegistration registerService = bundleContext.registerService(Supplier.class, () -> {
            return apply;
        }, new Hashtable(map));
        Collection<AutoCloseables.AutoCloseableSafe> collection = this._autoCloseables;
        Objects.requireNonNull(registerService);
        collection.add(AutoCloseables.mergeAutoCloseableToSafe(new AutoCloseable[]{apply, registerService::unregister}));
    }

    private <T> void _registerService(BundleContext bundleContext, Class<T> cls, T t, Map<String, Object> map) {
        ServiceRegistration registerService = bundleContext.registerService(cls, t, new Hashtable(map));
        Collection<AutoCloseables.AutoCloseableSafe> collection = this._autoCloseables;
        Objects.requireNonNull(registerService);
        collection.add(AutoCloseables.mergeAutoCloseableToSafe(registerService::unregister));
    }
}
