/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.map.impl.mapstore.writebehind;

import com.hazelcast.cluster.ClusterService;
import com.hazelcast.instance.GroupProperties;
import com.hazelcast.instance.GroupProperty;
import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.PartitionContainer;
import com.hazelcast.map.impl.mapstore.MapDataStore;
import com.hazelcast.map.impl.mapstore.MapStoreContext;
import com.hazelcast.map.impl.mapstore.writebehind.WriteBehindProcessor;
import com.hazelcast.map.impl.mapstore.writebehind.WriteBehindQueue;
import com.hazelcast.map.impl.mapstore.writebehind.WriteBehindStore;
import com.hazelcast.map.impl.mapstore.writebehind.entry.DelayedEntry;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.partition.InternalPartition;
import com.hazelcast.partition.InternalPartitionService;
import com.hazelcast.spi.ExecutionService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.util.Clock;
import com.hazelcast.util.CollectionUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class StoreWorker
implements Runnable {
    private final String mapName;
    private final MapServiceContext mapServiceContext;
    private final WriteBehindProcessor writeBehindProcessor;
    private final ExecutionService executionService;
    private final long backupDelayMillis;
    private final long writeDelayMillis;
    private final int partitionCount;
    private long lastHighestStoreTime;
    private volatile boolean running;

    public StoreWorker(MapStoreContext mapStoreContext, WriteBehindProcessor writeBehindProcessor) {
        this.mapName = mapStoreContext.getMapName();
        this.mapServiceContext = mapStoreContext.getMapServiceContext();
        this.writeBehindProcessor = writeBehindProcessor;
        this.backupDelayMillis = this.getReplicaWaitTimeMillis();
        this.lastHighestStoreTime = Clock.currentTimeMillis();
        this.writeDelayMillis = TimeUnit.SECONDS.toMillis(mapStoreContext.getMapStoreConfig().getWriteDelaySeconds());
        NodeEngine nodeEngine = this.mapServiceContext.getNodeEngine();
        InternalPartitionService partitionService = nodeEngine.getPartitionService();
        this.partitionCount = partitionService.getPartitionCount();
        this.executionService = nodeEngine.getExecutionService();
    }

    public synchronized void start() {
        if (this.running) {
            return;
        }
        this.running = true;
        this.schedule();
    }

    public synchronized void stop() {
        this.running = false;
    }

    @Override
    public void run() {
        try {
            this.runInternal();
        }
        finally {
            if (this.running) {
                this.schedule();
            }
        }
    }

    private void schedule() {
        this.executionService.schedule(this, 1L, TimeUnit.SECONDS);
    }

    private void runInternal() {
        long now = Clock.currentTimeMillis();
        long ownerHighestStoreTime = this.calculateHighestStoreTime(this.lastHighestStoreTime, now);
        long backupHighestStoreTime = ownerHighestStoreTime - this.backupDelayMillis;
        this.lastHighestStoreTime = ownerHighestStoreTime;
        List<DelayedEntry> ownersList = null;
        List<DelayedEntry> backupsList = null;
        for (int partitionId = 0; partitionId < this.partitionCount && !Thread.currentThread().isInterrupted(); ++partitionId) {
            RecordStore recordStore = this.getRecordStoreOrNull(this.mapName, partitionId);
            if (!this.hasEntryInWriteBehindQueue(recordStore)) continue;
            boolean localPartition = this.isPartitionLocal(partitionId);
            if (!localPartition) {
                backupsList = StoreWorker.initListIfNull(backupsList, this.partitionCount);
                this.selectEntriesToStore(recordStore, backupsList, backupHighestStoreTime);
                continue;
            }
            ownersList = StoreWorker.initListIfNull(ownersList, this.partitionCount);
            this.selectEntriesToStore(recordStore, ownersList, ownerHighestStoreTime);
        }
        if (!CollectionUtil.isEmpty(ownersList)) {
            Map<Integer, List<DelayedEntry>> failuresPerPartition = this.writeBehindProcessor.process(ownersList);
            this.removeFinishedStoreOperationsFromQueues(this.mapName, ownersList);
            this.reAddFailedStoreOperationsToQueues(this.mapName, failuresPerPartition);
        }
        if (!CollectionUtil.isEmpty(backupsList)) {
            this.doInBackup(backupsList);
        }
    }

    private boolean isPartitionLocal(int partitionId) {
        NodeEngine nodeEngine = this.mapServiceContext.getNodeEngine();
        ClusterService clusterService = nodeEngine.getClusterService();
        InternalPartitionService partitionService = nodeEngine.getPartitionService();
        Address thisAddress = clusterService.getThisAddress();
        InternalPartition partition = partitionService.getPartition(partitionId, false);
        Address owner = partition.getOwnerOrNull();
        return owner != null && owner.equals(thisAddress);
    }

    private static List<DelayedEntry> initListIfNull(List<DelayedEntry> list, int capacity) {
        if (list == null) {
            list = new ArrayList<DelayedEntry>(capacity);
        }
        return list;
    }

    private long calculateHighestStoreTime(long lastHighestStoreTime, long now) {
        return now >= lastHighestStoreTime + this.writeDelayMillis ? now : lastHighestStoreTime;
    }

    private boolean hasEntryInWriteBehindQueue(RecordStore recordStore) {
        if (recordStore == null) {
            return false;
        }
        MapDataStore<Data, Object> mapDataStore = recordStore.getMapDataStore();
        WriteBehindStore dataStore = (WriteBehindStore)mapDataStore;
        WriteBehindQueue<DelayedEntry> writeBehindQueue = dataStore.getWriteBehindQueue();
        return writeBehindQueue.size() != 0;
    }

    private void selectEntriesToStore(RecordStore recordStore, List<DelayedEntry> entries, long now) {
        int flushCount = StoreWorker.getNumberOfFlushedEntries(recordStore);
        WriteBehindQueue<DelayedEntry> queue = StoreWorker.getWriteBehindQueue(recordStore);
        this.filterWriteBehindQueue(now, flushCount, entries, queue);
    }

    private void filterWriteBehindQueue(long now, int count, Collection<DelayedEntry> collection, WriteBehindQueue<DelayedEntry> queue) {
        if (count > 0) {
            queue.getFrontByNumber(count, collection);
        } else {
            queue.getFrontByTime(now, collection);
        }
    }

    private void removeFinishedStoreOperationsFromQueues(String mapName, List<DelayedEntry> entries) {
        for (DelayedEntry entry : entries) {
            int partitionId = entry.getPartitionId();
            RecordStore recordStore = this.getRecordStoreOrNull(mapName, partitionId);
            if (recordStore == null) continue;
            WriteBehindQueue<DelayedEntry> queue = StoreWorker.getWriteBehindQueue(recordStore);
            queue.removeFirstOccurrence(entry);
            AtomicInteger flushCounter = StoreWorker.getFlushCounter(recordStore);
            int flushCount = flushCounter.get();
            if (flushCount <= 0) continue;
            flushCounter.addAndGet(-1);
        }
    }

    private void reAddFailedStoreOperationsToQueues(String mapName, Map<Integer, List<DelayedEntry>> failuresPerPartition) {
        if (failuresPerPartition.isEmpty()) {
            return;
        }
        for (Map.Entry<Integer, List<DelayedEntry>> entry : failuresPerPartition.entrySet()) {
            RecordStore recordStore;
            Integer partitionId = entry.getKey();
            List<DelayedEntry> failures = failuresPerPartition.get(partitionId);
            if (CollectionUtil.isEmpty(failures) || (recordStore = this.getRecordStoreOrNull(mapName, partitionId)) == null) continue;
            WriteBehindQueue<DelayedEntry> queue = StoreWorker.getWriteBehindQueue(recordStore);
            queue.addFirst(failures);
        }
    }

    private void doInBackup(List<DelayedEntry> delayedEntries) {
        this.writeBehindProcessor.callBeforeStoreListeners(delayedEntries);
        this.removeFinishedStoreOperationsFromQueues(this.mapName, delayedEntries);
        this.writeBehindProcessor.callAfterStoreListeners(delayedEntries);
    }

    private long getReplicaWaitTimeMillis() {
        GroupProperties groupProperties = this.mapServiceContext.getNodeEngine().getGroupProperties();
        return groupProperties.getMillis(GroupProperty.MAP_REPLICA_SCHEDULED_TASK_DELAY_SECONDS);
    }

    private RecordStore getRecordStoreOrNull(String mapName, int partitionId) {
        PartitionContainer partitionContainer = this.mapServiceContext.getPartitionContainer(partitionId);
        return partitionContainer.getExistingRecordStore(mapName);
    }

    private static WriteBehindQueue<DelayedEntry> getWriteBehindQueue(RecordStore recordStore) {
        WriteBehindStore writeBehindStore = (WriteBehindStore)recordStore.getMapDataStore();
        return writeBehindStore.getWriteBehindQueue();
    }

    private static AtomicInteger getFlushCounter(RecordStore recordStore) {
        WriteBehindStore writeBehindStore = (WriteBehindStore)recordStore.getMapDataStore();
        return writeBehindStore.getFlushCounter();
    }

    private static int getNumberOfFlushedEntries(RecordStore recordStore) {
        AtomicInteger flushCounter = StoreWorker.getFlushCounter(recordStore);
        return flushCounter.get();
    }

    public String toString() {
        return "StoreWorker{mapName='" + this.mapName + "'}";
    }
}

