/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.reallive.impl.actors;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.api.ChangeMessage;
import org.nustaq.reallive.api.ChangeReceiver;
import org.nustaq.reallive.api.RLFunction;
import org.nustaq.reallive.api.RLPredicate;
import org.nustaq.reallive.api.RealLiveTable;
import org.nustaq.reallive.api.Record;
import org.nustaq.reallive.api.Subscriber;
import org.nustaq.reallive.api.TableDescription;
import org.nustaq.reallive.impl.RLUtil;
import org.nustaq.reallive.impl.actors.ShardFunc;
import org.nustaq.reallive.impl.storage.StorageStats;
import org.nustaq.reallive.messages.AddMessage;
import org.nustaq.reallive.messages.PutMessage;
import org.nustaq.reallive.messages.RemoveMessage;
import org.nustaq.reallive.records.RecordWrapper;

public class TableSharding
implements RealLiveTable {
    ShardFunc func;
    RealLiveTable[] shards;
    final ConcurrentHashMap<Subscriber, List<Subscriber>> subsMap = new ConcurrentHashMap();
    private TableDescription description;

    public TableSharding(ShardFunc func, RealLiveTable[] shards, TableDescription desc) {
        this.func = func;
        this.shards = shards;
        this.description = desc;
    }

    @Override
    public void receive(ChangeMessage change) {
        if (change.getType() != 3) {
            this.shards[this.func.apply(change.getKey())].receive(change);
        }
    }

    @Override
    public IPromise resizeIfLoadFactorLarger(double loadFactor, long maxGrowBytes) {
        ArrayList<IPromise> futs = new ArrayList<IPromise>();
        for (int i = 0; i < this.shards.length; ++i) {
            RealLiveTable shard = this.shards[i];
            futs.add(shard.resizeIfLoadFactorLarger(loadFactor, maxGrowBytes));
        }
        return Actors.all(futs);
    }

    @Override
    public void subscribe(Subscriber subs) {
        AtomicInteger doneCount = new AtomicInteger(this.shards.length);
        ChangeReceiver receiver = subs.getReceiver();
        ArrayList<Subscriber> subsList = new ArrayList<Subscriber>();
        for (int i = 0; i < this.shards.length; ++i) {
            RealLiveTable shard = this.shards[i];
            Subscriber shardSubs = new Subscriber(subs.getFilter(), change -> {
                if (change.getType() == 3) {
                    int count = doneCount.decrementAndGet();
                    if (count == 0) {
                        receiver.receive(change);
                    }
                } else {
                    receiver.receive(change);
                }
            });
            subsList.add(shardSubs);
            shard.subscribe(shardSubs);
        }
        this.subsMap.put(subs, subsList);
    }

    @Override
    public void unsubscribe(Subscriber subs) {
        if (subs == null) {
            Log.Warn((Object)this, (String)"unsubscribed is null");
            return;
        }
        List<Subscriber> subscribers = this.subsMap.get(subs);
        if (subscribers == null) {
            Log.Warn((Object)this, (String)("unknown subscriber to unsubscribe " + subs));
            return;
        }
        for (int i = 0; i < subscribers.size(); ++i) {
            Subscriber subscriber = subscribers.get(i);
            this.shards[i].unsubscribe(subscriber);
        }
        this.subsMap.remove(subs);
    }

    @Override
    public IPromise atomic(String key, RLFunction<Record, Object> action) {
        return this.shards[this.func.apply(key)].atomic(key, action);
    }

    @Override
    public void atomicUpdate(RLPredicate<Record> filter, RLFunction<Record, Boolean> action) {
        for (int i = 0; i < this.shards.length; ++i) {
            this.shards[i].atomicUpdate(filter, action);
        }
    }

    @Override
    public void put(String key, Object ... keyVals) {
        this.shards[this.func.apply(key)].receive(new PutMessage(RLUtil.get().record(key, keyVals)));
    }

    @Override
    public void merge(String key, Object ... keyVals) {
        this.shards[this.func.apply(key)].receive(RLUtil.get().addOrUpdate(key, keyVals));
    }

    @Override
    public IPromise<Boolean> add(String key, Object ... keyVals) {
        return this.shards[this.func.apply(key)].add(key, keyVals);
    }

    @Override
    public IPromise<Boolean> addRecord(Record rec) {
        if (rec instanceof RecordWrapper) {
            rec = ((RecordWrapper)rec).getRecord();
        }
        return this.shards[this.func.apply(rec.getKey())].addRecord(rec);
    }

    @Override
    public void mergeRecord(Record rec) {
        if (rec instanceof RecordWrapper) {
            rec = ((RecordWrapper)rec).getRecord();
        }
        this.shards[this.func.apply(rec.getKey())].receive(new AddMessage(true, rec));
    }

    @Override
    public void setRecord(Record rec) {
        if (rec instanceof RecordWrapper) {
            rec = ((RecordWrapper)rec).getRecord();
        }
        this.shards[this.func.apply(rec.getKey())].receive(new PutMessage(rec));
    }

    @Override
    public void update(String key, Object ... keyVals) {
        this.shards[this.func.apply(key)].receive(RLUtil.get().update(key, keyVals));
    }

    @Override
    public IPromise<Record> take(String key) {
        return this.shards[this.func.apply(key)].take(key);
    }

    @Override
    public void remove(String key) {
        RemoveMessage remove = RLUtil.get().remove(key);
        this.shards[this.func.apply(key)].receive(remove);
    }

    @Override
    public <T> void forEachWithSpore(Spore<Record, T> spore) {
        spore.setExpectedFinishCount(this.shards.length);
        for (int i = 0; i < this.shards.length; ++i) {
            RealLiveTable shard = this.shards[i];
            shard.forEachWithSpore(spore);
        }
    }

    @Override
    public IPromise ping() {
        ArrayList<IPromise> futs = new ArrayList<IPromise>();
        for (int i = 0; i < this.shards.length; ++i) {
            RealLiveTable shard = this.shards[i];
            futs.add(shard.ping());
        }
        return Actors.all(futs);
    }

    @Override
    public IPromise<TableDescription> getDescription() {
        return new Promise((Object)this.description);
    }

    @Override
    public void stop() {
        for (int i = 0; i < this.shards.length; ++i) {
            this.shards[i].stop();
        }
    }

    @Override
    public IPromise<StorageStats> getStats() {
        Promise res = new Promise();
        try {
            Actors.all((int)this.shards.length, i -> this.shards[i].getStats()).then((Callback & Serializable)(shardStats, err) -> {
                if (shardStats != null) {
                    StorageStats stats = new StorageStats();
                    for (int i = 0; i < ((IPromise[])shardStats).length; ++i) {
                        StorageStats storageStats = (StorageStats)shardStats[i].get();
                        storageStats.addTo(stats);
                    }
                    res.resolve((Object)stats);
                } else {
                    res.reject(err);
                }
            });
        }
        catch (Exception e) {
            Log.Warn((Object)this, (Throwable)e);
            res.reject((Object)e);
        }
        return res;
    }

    @Override
    public IPromise<Record> get(String key) {
        if (key == null) {
            return null;
        }
        return this.shards[this.func.apply(key)].get(key);
    }

    @Override
    public IPromise<Long> size() {
        Promise result = new Promise();
        ArrayList<IPromise<Long>> futs = new ArrayList<IPromise<Long>>();
        for (int i = 0; i < this.shards.length; ++i) {
            RealLiveTable shard = this.shards[i];
            futs.add(shard.size());
        }
        Actors.all(futs).then(longPromisList -> {
            long sum = longPromisList.stream().mapToLong(prom -> (Long)prom.get()).sum();
            result.resolve((Object)sum);
        });
        return result;
    }
}

