/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.reallive.impl;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.reallive.impl.FilterSpore;
import org.nustaq.reallive.impl.RLUtil;
import org.nustaq.reallive.interfaces.ChangeMessage;
import org.nustaq.reallive.interfaces.FilterProcessor;
import org.nustaq.reallive.interfaces.Record;
import org.nustaq.reallive.interfaces.RecordIterable;
import org.nustaq.reallive.interfaces.Subscriber;
import org.nustaq.reallive.messages.AddMessage;
import org.nustaq.reallive.messages.ChangeUtils;
import org.nustaq.reallive.messages.PutMessage;
import org.nustaq.reallive.messages.RemoveMessage;
import org.nustaq.reallive.messages.UpdateMessage;
import org.nustaq.reallive.records.RecordWrapper;

public class FilterProcessorImpl<K>
implements FilterProcessor<K> {
    List<Subscriber<K>> filterList = new ArrayList<Subscriber<K>>();
    RecordIterable<K> provider;

    public FilterProcessorImpl(RecordIterable<K> provider) {
        this.provider = provider;
    }

    @Override
    public void subscribe(Subscriber<K> subs) {
        this.filterList.add(subs);
        FilterSpore<K> spore = new FilterSpore<K>(subs.getFilter());
        spore.onFinish(() -> subs.getReceiver().receive(RLUtil.get().done()));
        spore.setForEach((Callback & Serializable)(r, e) -> {
            if (Actors.isResult((Object)e)) {
                subs.getReceiver().receive(new AddMessage((Record)r));
            } else {
                subs.getReceiver().receive(RLUtil.get().done());
            }
        });
        this.provider.forEach(spore);
    }

    @Override
    public void unsubscribe(Subscriber<K> subs) {
        this.filterList.remove(subs);
    }

    @Override
    public void receive(ChangeMessage<K> change) {
        switch (change.getType()) {
            case 3: {
                break;
            }
            case 4: {
                this.processPut((PutMessage)change);
                break;
            }
            case 0: {
                this.processAdd((AddMessage)change);
                break;
            }
            case 2: {
                this.processUpdate((UpdateMessage)change);
                break;
            }
            case 1: {
                this.processRemove((RemoveMessage)change);
            }
        }
    }

    protected void processPut(PutMessage<K> change) {
        Record<K> record = change.getRecord();
        for (Subscriber<K> subscriber : this.filterList) {
            if (!subscriber.getFilter().test(record)) continue;
            subscriber.getReceiver().receive(change);
        }
    }

    protected void processUpdate(UpdateMessage<K> change) {
        Record<K> newRecord = change.getNewRecord();
        final String[] changedFields = change.getDiff().getChangedFields();
        final Object[] oldValues = change.getDiff().getOldValues();
        RecordWrapper oldRec = new RecordWrapper(newRecord){

            @Override
            public Object get(String field) {
                int index = ChangeUtils.indexOf(field, changedFields);
                if (index >= 0) {
                    return oldValues[index];
                }
                return super.get(field);
            }
        };
        for (Subscriber<K> subscriber : this.filterList) {
            boolean matchesOld = subscriber.getFilter().test(oldRec);
            boolean matchesNew = subscriber.getFilter().test(newRecord);
            if (matchesOld && matchesNew) {
                subscriber.getReceiver().receive(change);
                continue;
            }
            if (matchesOld) {
                subscriber.getReceiver().receive(new RemoveMessage<K>(change.getNewRecord()));
                continue;
            }
            if (!matchesNew) continue;
            subscriber.getReceiver().receive(new AddMessage<K>(change.getNewRecord()));
        }
    }

    protected void processAdd(AddMessage<K> add) {
        Record<K> record = add.getRecord();
        for (Subscriber<K> subscriber : this.filterList) {
            if (!subscriber.getFilter().test(record)) continue;
            subscriber.getReceiver().receive(add);
        }
    }

    protected void processRemove(RemoveMessage remove) {
        Record record = remove.getRecord();
        for (Subscriber<K> subscriber : this.filterList) {
            if (!subscriber.getFilter().test(record)) continue;
            subscriber.getReceiver().receive(remove);
        }
    }
}

