/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rya.periodic.notification.registration.kafka;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
import org.apache.rya.periodic.notification.notification.CommandNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PeriodicNotificationConsumer
implements Runnable {
    private final KafkaConsumer<String, CommandNotification> consumer;
    private final int threadNumber;
    private final String topic;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final NotificationCoordinatorExecutor coord;
    private static final Logger LOG = LoggerFactory.getLogger(PeriodicNotificationConsumer.class);

    public PeriodicNotificationConsumer(String topic, KafkaConsumer<String, CommandNotification> consumer, int threadNumber, NotificationCoordinatorExecutor coord) {
        this.topic = topic;
        this.threadNumber = threadNumber;
        this.consumer = consumer;
        this.coord = coord;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            LOG.info("Configuring KafkaConsumer on thread: {} to subscribe to topic: {}", (Object)this.threadNumber, (Object)this.topic);
            this.consumer.subscribe(Arrays.asList(this.topic));
            while (!this.closed.get()) {
                ConsumerRecords records = this.consumer.poll(10000L);
                for (ConsumerRecord record : records) {
                    CommandNotification notification = (CommandNotification)record.value();
                    LOG.info("Thread {} is adding notification: {}", (Object)this.threadNumber, (Object)notification);
                    this.coord.processNextCommandNotification(notification);
                }
            }
            LOG.info("Finished polling.");
        }
        catch (WakeupException e) {
            if (!this.closed.get()) {
                throw e;
            }
        }
        finally {
            this.consumer.close();
        }
    }

    public void shutdown() {
        this.closed.set(true);
        this.consumer.wakeup();
    }
}

