/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rya.periodic.notification.processor;

import com.google.common.base.Preconditions;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.periodic.notification.api.BindingSetRecord;
import org.apache.rya.periodic.notification.api.NodeBin;
import org.apache.rya.periodic.notification.api.NotificationProcessor;
import org.apache.rya.periodic.notification.notification.TimestampedNotification;
import org.eclipse.rdf4j.query.BindingSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimestampedNotificationProcessor
implements NotificationProcessor,
Runnable {
    private static final Logger log = LoggerFactory.getLogger(TimestampedNotificationProcessor.class);
    private final PeriodicQueryResultStorage periodicStorage;
    private final BlockingQueue<TimestampedNotification> notifications;
    private final BlockingQueue<NodeBin> bins;
    private final BlockingQueue<BindingSetRecord> bindingSets;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final int threadNumber;

    public TimestampedNotificationProcessor(PeriodicQueryResultStorage periodicStorage, BlockingQueue<TimestampedNotification> notifications, BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets, int threadNumber) {
        this.notifications = (BlockingQueue)Preconditions.checkNotNull(notifications);
        this.bins = (BlockingQueue)Preconditions.checkNotNull(bins);
        this.bindingSets = (BlockingQueue)Preconditions.checkNotNull(bindingSets);
        this.periodicStorage = periodicStorage;
        this.threadNumber = threadNumber;
    }

    public void processNotification(TimestampedNotification notification) {
        String id = notification.getId();
        long ts = notification.getTimestamp().getTime();
        long period = notification.getPeriod();
        long bin = this.getBinFromTimestamp(ts, period);
        NodeBin nodeBin = new NodeBin(id, bin);
        try (CloseableIterator iter = this.periodicStorage.listResults(id, Optional.of(bin));){
            while (iter.hasNext()) {
                this.bindingSets.add(new BindingSetRecord((BindingSet)iter.next(), id));
            }
            this.bins.add(nodeBin);
        }
        catch (Exception e) {
            log.warn("Encountered exception while accessing periodic results for bin: " + bin + " for query: " + id, (Throwable)e);
        }
    }

    private long getBinFromTimestamp(long ts, long period) {
        Preconditions.checkArgument((period > 0L ? 1 : 0) != 0);
        return ts / period * period;
    }

    @Override
    public void run() {
        try {
            while (!this.closed.get()) {
                this.processNotification(this.notifications.take());
            }
        }
        catch (Exception e) {
            log.warn("Thread {} is unable to process next notification.", (Object)this.threadNumber);
            throw new RuntimeException(e);
        }
    }

    public void shutdown() {
        this.closed.set(true);
    }

    public static Builder builder() {
        return new Builder();
    }

    public static class Builder {
        private PeriodicQueryResultStorage periodicStorage;
        private BlockingQueue<TimestampedNotification> notifications;
        private BlockingQueue<NodeBin> bins;
        private BlockingQueue<BindingSetRecord> bindingSets;
        private int threadNumber;

        public Builder setNotifications(BlockingQueue<TimestampedNotification> notifications) {
            this.notifications = notifications;
            return this;
        }

        public Builder setBins(BlockingQueue<NodeBin> bins) {
            this.bins = bins;
            return this;
        }

        public Builder setBindingSets(BlockingQueue<BindingSetRecord> bindingSets) {
            this.bindingSets = bindingSets;
            return this;
        }

        public Builder setThreadNumber(int threadNumber) {
            this.threadNumber = threadNumber;
            return this;
        }

        public Builder setPeriodicStorage(PeriodicQueryResultStorage periodicStorage) {
            this.periodicStorage = periodicStorage;
            return this;
        }

        public TimestampedNotificationProcessor build() {
            return new TimestampedNotificationProcessor(this.periodicStorage, this.notifications, this.bins, this.bindingSets, this.threadNumber);
        }
    }
}

