/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rya.periodic.notification.application;

import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.Snapshot;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
import org.apache.rya.periodic.notification.api.BindingSetRecord;
import org.apache.rya.periodic.notification.api.NodeBin;
import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
import org.apache.rya.periodic.notification.application.PeriodicApplicationException;
import org.apache.rya.periodic.notification.application.PeriodicNotificationApplication;
import org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration;
import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor;
import org.apache.rya.periodic.notification.notification.CommandNotification;
import org.apache.rya.periodic.notification.notification.TimestampedNotification;
import org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor;
import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor;
import org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider;
import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider;
import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
import org.eclipse.rdf4j.query.BindingSet;

public class PeriodicNotificationApplicationFactory {
    public static PeriodicNotificationApplication getPeriodicApplication(PeriodicNotificationApplicationConfiguration conf) throws PeriodicApplicationException {
        Properties kafkaConsumerProps = PeriodicNotificationApplicationFactory.getKafkaConsumerProperties(conf);
        Properties kafkaProducerProps = PeriodicNotificationApplicationFactory.getKafkaProducerProperties(conf);
        LinkedBlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<TimestampedNotification>();
        LinkedBlockingQueue<NodeBin> bins = new LinkedBlockingQueue<NodeBin>();
        LinkedBlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<BindingSetRecord>();
        FluoClient fluo = null;
        try {
            PeriodicQueryResultStorage storage = PeriodicNotificationApplicationFactory.getPeriodicQueryResultStorage(conf);
            fluo = FluoClientFactory.getFluoClient((String)conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), (AccumuloRdfConfiguration)conf);
            NotificationCoordinatorExecutor coordinator = PeriodicNotificationApplicationFactory.getCoordinator(conf.getCoordinatorThreads(), notifications);
            PeriodicNotificationApplicationFactory.addRegisteredNotices(coordinator, fluo.newSnapshot());
            KafkaExporterExecutor exporter = PeriodicNotificationApplicationFactory.getExporter(conf.getExporterThreads(), kafkaProducerProps, bindingSets);
            PeriodicQueryPrunerExecutor pruner = PeriodicNotificationApplicationFactory.getPruner(storage, fluo, conf.getPrunerThreads(), bins);
            NotificationProcessorExecutor processor = PeriodicNotificationApplicationFactory.getProcessor(storage, notifications, bins, bindingSets, conf.getProcessorThreads());
            KafkaNotificationProvider provider = PeriodicNotificationApplicationFactory.getProvider(conf.getProducerThreads(), conf.getNotificationTopic(), coordinator, kafkaConsumerProps);
            return PeriodicNotificationApplication.builder().setCoordinator(coordinator).setProvider(provider).setExporter(exporter).setProcessor(processor).setPruner(pruner).build();
        }
        catch (AccumuloException | AccumuloSecurityException e) {
            throw new PeriodicApplicationException(e.getMessage());
        }
    }

    private static void addRegisteredNotices(NotificationCoordinatorExecutor coord, Snapshot sx) {
        coord.start();
        PeriodicNotificationProvider provider = new PeriodicNotificationProvider();
        provider.processRegisteredNotifications(coord, sx);
    }

    private static NotificationCoordinatorExecutor getCoordinator(int numThreads, BlockingQueue<TimestampedNotification> notifications) {
        return new PeriodicNotificationCoordinatorExecutor(numThreads, notifications);
    }

    private static KafkaExporterExecutor getExporter(int numThreads, Properties props, BlockingQueue<BindingSetRecord> bindingSets) {
        KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new BindingSetSerDe());
        return new KafkaExporterExecutor((KafkaProducer<String, BindingSet>)producer, numThreads, bindingSets);
    }

    private static PeriodicQueryPrunerExecutor getPruner(PeriodicQueryResultStorage storage, FluoClient fluo, int numThreads, BlockingQueue<NodeBin> bins) {
        return new PeriodicQueryPrunerExecutor(storage, fluo, numThreads, bins);
    }

    private static NotificationProcessorExecutor getProcessor(PeriodicQueryResultStorage periodicStorage, BlockingQueue<TimestampedNotification> notifications, BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets, int numThreads) {
        return new NotificationProcessorExecutor(periodicStorage, notifications, bins, bindingSets, numThreads);
    }

    private static KafkaNotificationProvider getProvider(int numThreads, String topic, NotificationCoordinatorExecutor coord, Properties props) {
        return new KafkaNotificationProvider(topic, (Deserializer<String>)new StringDeserializer(), (Deserializer<CommandNotification>)new CommandNotificationSerializer(), props, coord, numThreads);
    }

    private static PeriodicQueryResultStorage getPeriodicQueryResultStorage(PeriodicNotificationApplicationConfiguration conf) throws AccumuloException, AccumuloSecurityException {
        ZooKeeperInstance instance = new ZooKeeperInstance(conf.getAccumuloInstance(), conf.getAccumuloZookeepers());
        Connector conn = instance.getConnector(conf.getAccumuloUser(), (AuthenticationToken)new PasswordToken((CharSequence)conf.getAccumuloPassword()));
        String ryaInstance = conf.getTablePrefix();
        return new AccumuloPeriodicQueryResultStorage(conn, ryaInstance);
    }

    private static Properties getKafkaConsumerProperties(PeriodicNotificationApplicationConfiguration conf) {
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", conf.getBootStrapServers());
        kafkaProps.setProperty("client.id", conf.getNotificationClientId());
        kafkaProps.setProperty("group.id", conf.getNotificationGroupId());
        kafkaProps.setProperty("auto.offset.reset", "earliest");
        kafkaProps.setProperty("metadata.max.age.ms", "30000");
        return kafkaProps;
    }

    private static Properties getKafkaProducerProperties(PeriodicNotificationApplicationConfiguration conf) {
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", conf.getBootStrapServers());
        return kafkaProps;
    }
}

