/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateEntry;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot;
import org.apache.flink.runtime.state.heap.StateMap;
import org.apache.flink.runtime.state.heap.StateMapSnapshot;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CopyOnWriteStateMap<K, N, S>
extends StateMap<K, N, S> {
    private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteStateMap.class);
    private static final int MINIMUM_CAPACITY = 4;
    private static final int MAXIMUM_CAPACITY = 0x40000000;
    public static final int DEFAULT_CAPACITY = 128;
    private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4;
    protected TypeSerializer<S> stateSerializer;
    private static final StateMapEntry<?, ?, ?>[] EMPTY_TABLE = new StateMapEntry[2];
    private static final StateMapEntry<?, ?, ?> ITERATOR_BOOTSTRAP_ENTRY = new StateMapEntry<Object, Object, Object>(new Object(), new Object(), new Object(), 0, null, 0, 0);
    private final TreeSet<Integer> snapshotVersions;
    private StateMapEntry<K, N, S>[] primaryTable;
    private StateMapEntry<K, N, S>[] incrementalRehashTable;
    private int primaryTableSize;
    private int incrementalRehashTableSize;
    private int rehashIndex;
    private int stateMapVersion;
    private int highestRequiredSnapshotVersion;
    private N lastNamespace;
    private int threshold;
    private int modCount;

    CopyOnWriteStateMap(TypeSerializer<S> stateSerializer) {
        this(128, stateSerializer);
    }

    private CopyOnWriteStateMap(int capacity, TypeSerializer<S> stateSerializer) {
        this.stateSerializer = (TypeSerializer)Preconditions.checkNotNull(stateSerializer);
        this.primaryTable = EMPTY_TABLE;
        this.incrementalRehashTable = EMPTY_TABLE;
        this.primaryTableSize = 0;
        this.incrementalRehashTableSize = 0;
        this.rehashIndex = 0;
        this.stateMapVersion = 0;
        this.highestRequiredSnapshotVersion = 0;
        this.snapshotVersions = new TreeSet();
        if (capacity < 0) {
            throw new IllegalArgumentException("Capacity: " + capacity);
        }
        if (capacity == 0) {
            this.threshold = -1;
            return;
        }
        capacity = capacity < 4 ? 4 : (capacity > 0x40000000 ? 0x40000000 : MathUtils.roundUpToPowerOfTwo((int)capacity));
        this.primaryTable = this.makeTable(capacity);
    }

    @Override
    public int size() {
        return this.primaryTableSize + this.incrementalRehashTableSize;
    }

    @Override
    public S get(K key, N namespace) {
        int hash = this.computeHashForOperationAndDoIncrementalRehash(key, namespace);
        int requiredVersion = this.highestRequiredSnapshotVersion;
        StateMapEntry<K, N, S>[] tab = this.selectActiveTable(hash);
        int index = hash & tab.length - 1;
        StateMapEntry<K, N, S> e = tab[index];
        while (e != null) {
            Object eKey = e.key;
            Object eNamespace = e.namespace;
            if (e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace)) {
                if (e.stateVersion < requiredVersion) {
                    if (e.entryVersion < requiredVersion) {
                        e = this.handleChainedEntryCopyOnWrite(tab, hash & tab.length - 1, e);
                    }
                    e.stateVersion = this.stateMapVersion;
                    e.state = this.getStateSerializer().copy(e.state);
                }
                return e.state;
            }
            e = e.next;
        }
        return null;
    }

    @Override
    public boolean containsKey(K key, N namespace) {
        int hash = this.computeHashForOperationAndDoIncrementalRehash(key, namespace);
        StateMapEntry<K, N, S>[] tab = this.selectActiveTable(hash);
        int index = hash & tab.length - 1;
        StateMapEntry<K, N, S> e = tab[index];
        while (e != null) {
            Object eKey = e.key;
            Object eNamespace = e.namespace;
            if (e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace)) {
                return true;
            }
            e = e.next;
        }
        return false;
    }

    @Override
    public void put(K key, N namespace, S value) {
        StateMapEntry<K, N, S> e = this.putEntry(key, namespace);
        e.state = value;
        e.stateVersion = this.stateMapVersion;
    }

    @Override
    public S putAndGetOld(K key, N namespace, S state) {
        StateMapEntry<K, N, S> e = this.putEntry(key, namespace);
        Object oldState = e.stateVersion < this.highestRequiredSnapshotVersion ? this.getStateSerializer().copy(e.state) : e.state;
        e.state = state;
        e.stateVersion = this.stateMapVersion;
        return oldState;
    }

    @Override
    public void remove(K key, N namespace) {
        this.removeEntry(key, namespace);
    }

    @Override
    public S removeAndGetOld(K key, N namespace) {
        StateMapEntry<K, N, S> e = this.removeEntry(key, namespace);
        return (S)(e != null ? (e.stateVersion < this.highestRequiredSnapshotVersion ? this.getStateSerializer().copy(e.state) : (Object)e.state) : null);
    }

    @Override
    public Stream<K> getKeys(N namespace) {
        return StreamSupport.stream(this.spliterator(), false).filter(entry -> entry.getNamespace().equals(namespace)).map(StateEntry::getKey);
    }

    @Override
    public <T> void transform(K key, N namespace, T value, StateTransformationFunction<S, T> transformation) throws Exception {
        StateMapEntry<K, N, S> entry = this.putEntry(key, namespace);
        entry.state = transformation.apply(entry.stateVersion < this.highestRequiredSnapshotVersion ? this.getStateSerializer().copy(entry.state) : entry.state, value);
        entry.stateVersion = this.stateMapVersion;
    }

    private StateMapEntry<K, N, S> putEntry(K key, N namespace) {
        int hash = this.computeHashForOperationAndDoIncrementalRehash(key, namespace);
        StateMapEntry<K, N, S>[] tab = this.selectActiveTable(hash);
        int index = hash & tab.length - 1;
        StateMapEntry<K, N, S> e = tab[index];
        while (e != null) {
            if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
                if (e.entryVersion < this.highestRequiredSnapshotVersion) {
                    e = this.handleChainedEntryCopyOnWrite(tab, index, e);
                }
                return e;
            }
            e = e.next;
        }
        ++this.modCount;
        if (this.size() > this.threshold) {
            this.doubleCapacity();
        }
        return this.addNewStateMapEntry(tab, key, namespace, hash);
    }

    private StateMapEntry<K, N, S> removeEntry(K key, N namespace) {
        int hash = this.computeHashForOperationAndDoIncrementalRehash(key, namespace);
        StateMapEntry<K, N, S>[] tab = this.selectActiveTable(hash);
        int index = hash & tab.length - 1;
        StateMapEntry<K, N, S> e = tab[index];
        StateMapEntry<K, N, S> prev = null;
        while (e != null) {
            if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
                if (prev == null) {
                    tab[index] = e.next;
                } else {
                    if (prev.entryVersion < this.highestRequiredSnapshotVersion) {
                        prev = this.handleChainedEntryCopyOnWrite(tab, index, prev);
                    }
                    prev.next = e.next;
                }
                ++this.modCount;
                if (tab == this.primaryTable) {
                    --this.primaryTableSize;
                } else {
                    --this.incrementalRehashTableSize;
                }
                return e;
            }
            prev = e;
            e = e.next;
        }
        return null;
    }

    @Override
    @Nonnull
    public Iterator<StateEntry<K, N, S>> iterator() {
        return new StateEntryIterator();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void releaseSnapshot(int snapshotVersion) {
        TreeSet<Integer> treeSet = this.snapshotVersions;
        synchronized (treeSet) {
            Preconditions.checkState((boolean)this.snapshotVersions.remove(snapshotVersion), (Object)"Attempt to release unknown snapshot version");
            this.highestRequiredSnapshotVersion = this.snapshotVersions.isEmpty() ? 0 : this.snapshotVersions.last();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    StateMapEntry<K, N, S>[] snapshotMapArrays() {
        TreeSet<Integer> treeSet = this.snapshotVersions;
        synchronized (treeSet) {
            if (++this.stateMapVersion < 0) {
                throw new IllegalStateException("Version count overflow in CopyOnWriteStateMap. Enforcing restart.");
            }
            this.highestRequiredSnapshotVersion = this.stateMapVersion;
            this.snapshotVersions.add(this.highestRequiredSnapshotVersion);
        }
        StateMapEntry<K, N, S>[] table = this.primaryTable;
        int totalMapIndexSize = this.rehashIndex + table.length;
        int copiedArraySize = Math.max(totalMapIndexSize, this.size());
        StateMapEntry[] copy = new StateMapEntry[copiedArraySize];
        if (this.isRehashing()) {
            int localRehashIndex = this.rehashIndex;
            int localCopyLength = table.length - localRehashIndex;
            System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength);
            table = this.incrementalRehashTable;
            System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex);
            System.arraycopy(table, table.length >>> 1, copy, localCopyLength + localRehashIndex, localRehashIndex);
        } else {
            System.arraycopy(table, 0, copy, 0, table.length);
        }
        return copy;
    }

    int getStateMapVersion() {
        return this.stateMapVersion;
    }

    private StateMapEntry<K, N, S>[] makeTable(int newCapacity) {
        if (newCapacity < 0x40000000) {
            this.threshold = (newCapacity >> 1) + (newCapacity >> 2);
        } else {
            if (this.size() > 0x7FFFFFF7) {
                throw new IllegalStateException("Maximum capacity of CopyOnWriteStateMap is reached and the job cannot continue. Please consider scaling-out your job or using a different keyed state backend implementation!");
            }
            LOG.warn("Maximum capacity of 2^30 in StateMap reached. Cannot increase hash map size. This can lead to more collisions and lower performance. Please consider scaling-out your job or using a different keyed state backend implementation!");
            this.threshold = 0x7FFFFFF7;
        }
        StateMapEntry[] newMap = new StateMapEntry[newCapacity];
        return newMap;
    }

    private StateMapEntry<K, N, S> addNewStateMapEntry(StateMapEntry<K, N, S>[] table, K key, N namespace, int hash) {
        if (namespace.equals(this.lastNamespace)) {
            namespace = this.lastNamespace;
        } else {
            this.lastNamespace = namespace;
        }
        int index = hash & table.length - 1;
        StateMapEntry<K, N, Object> newEntry = new StateMapEntry<K, N, Object>(key, namespace, null, hash, table[index], this.stateMapVersion, this.stateMapVersion);
        table[index] = newEntry;
        if (table == this.primaryTable) {
            ++this.primaryTableSize;
        } else {
            ++this.incrementalRehashTableSize;
        }
        return newEntry;
    }

    private StateMapEntry<K, N, S>[] selectActiveTable(int hashCode) {
        return (hashCode & this.primaryTable.length - 1) >= this.rehashIndex ? this.primaryTable : this.incrementalRehashTable;
    }

    private void doubleCapacity() {
        Preconditions.checkState((!this.isRehashing() ? 1 : 0) != 0, (Object)"There is already a rehash in progress.");
        StateMapEntry<K, N, S>[] oldMap = this.primaryTable;
        int oldCapacity = oldMap.length;
        if (oldCapacity == 0x40000000) {
            return;
        }
        this.incrementalRehashTable = this.makeTable(oldCapacity * 2);
    }

    @VisibleForTesting
    boolean isRehashing() {
        return EMPTY_TABLE != this.incrementalRehashTable;
    }

    private int computeHashForOperationAndDoIncrementalRehash(K key, N namespace) {
        if (this.isRehashing()) {
            this.incrementalRehash();
        }
        return CopyOnWriteStateMap.compositeHash(key, namespace);
    }

    private void incrementalRehash() {
        StateMapEntry<K, N, S>[] oldMap = this.primaryTable;
        StateMapEntry<K, N, S>[] newMap = this.incrementalRehashTable;
        int oldCapacity = oldMap.length;
        int newMask = newMap.length - 1;
        int requiredVersion = this.highestRequiredSnapshotVersion;
        int rhIdx = this.rehashIndex;
        int transferred = 0;
        while (transferred < 4) {
            StateMapEntry<K, N, S> e = oldMap[rhIdx];
            while (e != null) {
                if (e.entryVersion < requiredVersion) {
                    e = new StateMapEntry<K, N, S>(e, this.stateMapVersion);
                }
                StateMapEntry n = e.next;
                int pos = e.hash & newMask;
                e.next = newMap[pos];
                newMap[pos] = e;
                e = n;
                ++transferred;
            }
            oldMap[rhIdx] = null;
            if (++rhIdx != oldCapacity) continue;
            this.primaryTable = newMap;
            this.incrementalRehashTable = EMPTY_TABLE;
            this.primaryTableSize += this.incrementalRehashTableSize;
            this.incrementalRehashTableSize = 0;
            this.rehashIndex = 0;
            return;
        }
        this.primaryTableSize -= transferred;
        this.incrementalRehashTableSize += transferred;
        this.rehashIndex = rhIdx;
    }

    private StateMapEntry<K, N, S> handleChainedEntryCopyOnWrite(StateMapEntry<K, N, S>[] tab, int mapIdx, StateMapEntry<K, N, S> untilEntry) {
        StateMapEntry<K, N, S> copy;
        int required = this.highestRequiredSnapshotVersion;
        StateMapEntry<K, N, S> current = tab[mapIdx];
        if (current.entryVersion < required) {
            tab[mapIdx] = copy = new StateMapEntry<K, N, S>(current, this.stateMapVersion);
        } else {
            copy = current;
        }
        while (current != untilEntry) {
            current = current.next;
            if (current.entryVersion < required) {
                copy.next = new StateMapEntry<K, N, S>(current, this.stateMapVersion);
                copy = copy.next;
                continue;
            }
            copy = current;
        }
        return copy;
    }

    private static <K, N, S> StateMapEntry<K, N, S> getBootstrapEntry() {
        return ITERATOR_BOOTSTRAP_ENTRY;
    }

    private static int compositeHash(Object key, Object namespace) {
        return MathUtils.bitMix((int)(key.hashCode() ^ namespace.hashCode()));
    }

    @Nonnull
    public CopyOnWriteStateMapSnapshot<K, N, S> stateSnapshot() {
        return new CopyOnWriteStateMapSnapshot(this);
    }

    @Override
    public void releaseSnapshot(StateMapSnapshot<K, N, S, ? extends StateMap<K, N, S>> snapshotToRelease) {
        CopyOnWriteStateMapSnapshot copyOnWriteStateMapSnapshot = (CopyOnWriteStateMapSnapshot)snapshotToRelease;
        Preconditions.checkArgument((boolean)copyOnWriteStateMapSnapshot.isOwner(this), (Object)"Cannot release snapshot which is owned by a different state map.");
        this.releaseSnapshot(copyOnWriteStateMapSnapshot.getSnapshotVersion());
    }

    @VisibleForTesting
    Set<Integer> getSnapshotVersions() {
        return this.snapshotVersions;
    }

    public TypeSerializer<S> getStateSerializer() {
        return this.stateSerializer;
    }

    public void setStateSerializer(TypeSerializer<S> stateSerializer) {
        this.stateSerializer = (TypeSerializer)Preconditions.checkNotNull(stateSerializer);
    }

    @Override
    public int sizeOfNamespace(Object namespace) {
        int count = 0;
        for (StateEntry<K, N, S> entry : this) {
            if (null == entry || !namespace.equals(entry.getNamespace())) continue;
            ++count;
        }
        return count;
    }

    @Override
    public InternalKvState.StateIncrementalVisitor<K, N, S> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
        return new StateIncrementalVisitorImpl(recommendedMaxNumberOfReturnedRecords);
    }

    class StateIncrementalVisitorImpl
    implements InternalKvState.StateIncrementalVisitor<K, N, S> {
        private final StateEntryChainIterator chainIterator;
        private final Collection<StateEntry<K, N, S>> chainToReturn = new ArrayList(5);

        StateIncrementalVisitorImpl(int recommendedMaxNumberOfReturnedRecords) {
            this.chainIterator = new StateEntryChainIterator(recommendedMaxNumberOfReturnedRecords);
        }

        @Override
        public boolean hasNext() {
            return this.chainIterator.hasNext();
        }

        @Override
        public Collection<StateEntry<K, N, S>> nextEntries() {
            if (!this.hasNext()) {
                return null;
            }
            this.chainToReturn.clear();
            StateMapEntry nextEntry = this.chainIterator.next();
            while (nextEntry != null) {
                this.chainToReturn.add(nextEntry);
                nextEntry = nextEntry.next;
            }
            return this.chainToReturn;
        }

        @Override
        public void remove(StateEntry<K, N, S> stateEntry) {
            CopyOnWriteStateMap.this.remove(stateEntry.getKey(), stateEntry.getNamespace());
        }

        @Override
        public void update(StateEntry<K, N, S> stateEntry, S newValue) {
            CopyOnWriteStateMap.this.put(stateEntry.getKey(), stateEntry.getNamespace(), newValue);
        }
    }

    class StateEntryIterator
    implements Iterator<StateEntry<K, N, S>> {
        private final StateEntryChainIterator chainIterator;
        private StateMapEntry<K, N, S> nextEntry;
        private final int expectedModCount;

        StateEntryIterator() {
            this.chainIterator = new StateEntryChainIterator();
            this.expectedModCount = CopyOnWriteStateMap.this.modCount;
            this.nextEntry = CopyOnWriteStateMap.getBootstrapEntry();
            this.advanceIterator();
        }

        @Override
        public boolean hasNext() {
            return this.nextEntry != null;
        }

        @Override
        public StateEntry<K, N, S> next() {
            if (CopyOnWriteStateMap.this.modCount != this.expectedModCount) {
                throw new ConcurrentModificationException();
            }
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.advanceIterator();
        }

        StateMapEntry<K, N, S> advanceIterator() {
            StateMapEntry entryToReturn = this.nextEntry;
            StateMapEntry next = this.nextEntry.next;
            if (next == null) {
                next = this.chainIterator.next();
            }
            this.nextEntry = next;
            return entryToReturn;
        }
    }

    class StateEntryChainIterator
    implements Iterator<StateMapEntry<K, N, S>> {
        StateMapEntry<K, N, S>[] activeTable;
        private int nextMapPosition;
        private final int maxTraversedMapPositions;

        StateEntryChainIterator() {
            this(Integer.MAX_VALUE);
        }

        StateEntryChainIterator(int maxTraversedMapPositions) {
            this.maxTraversedMapPositions = maxTraversedMapPositions;
            this.activeTable = CopyOnWriteStateMap.this.primaryTable;
            this.nextMapPosition = 0;
        }

        @Override
        public boolean hasNext() {
            return CopyOnWriteStateMap.this.size() > 0 && (this.nextMapPosition < this.activeTable.length || this.activeTable == CopyOnWriteStateMap.this.primaryTable);
        }

        @Override
        public StateMapEntry<K, N, S> next() {
            StateMapEntry next;
            while ((next = this.nextActiveMapPosition()) == null && this.nextMapPosition >= this.activeTable.length && this.activeTable != CopyOnWriteStateMap.this.incrementalRehashTable && this.activeTable == CopyOnWriteStateMap.this.primaryTable) {
                this.activeTable = CopyOnWriteStateMap.this.incrementalRehashTable;
                this.nextMapPosition = 0;
            }
            return next;
        }

        private StateMapEntry<K, N, S> nextActiveMapPosition() {
            StateMapEntry<K, N, S>[] tab = this.activeTable;
            for (int traversedPositions = 0; this.nextMapPosition < tab.length && traversedPositions < this.maxTraversedMapPositions; ++traversedPositions) {
                StateMapEntry next;
                if ((next = tab[this.nextMapPosition++]) == null) continue;
                return next;
            }
            return null;
        }
    }

    @VisibleForTesting
    protected static class StateMapEntry<K, N, S>
    implements StateEntry<K, N, S> {
        @Nonnull
        final K key;
        @Nonnull
        final N namespace;
        @Nullable
        S state;
        @Nullable
        StateMapEntry<K, N, S> next;
        int entryVersion;
        int stateVersion;
        final int hash;

        StateMapEntry(StateMapEntry<K, N, S> other, int entryVersion) {
            this(other.key, other.namespace, other.state, other.hash, other.next, entryVersion, other.stateVersion);
        }

        StateMapEntry(@Nonnull K key, @Nonnull N namespace, @Nullable S state, int hash, @Nullable StateMapEntry<K, N, S> next, int entryVersion, int stateVersion) {
            this.key = key;
            this.namespace = namespace;
            this.hash = hash;
            this.next = next;
            this.entryVersion = entryVersion;
            this.state = state;
            this.stateVersion = stateVersion;
        }

        public final void setState(@Nullable S value, int mapVersion) {
            if (value != this.state) {
                this.state = value;
                this.stateVersion = mapVersion;
            }
        }

        @Override
        @Nonnull
        public K getKey() {
            return this.key;
        }

        @Override
        @Nonnull
        public N getNamespace() {
            return this.namespace;
        }

        @Override
        @Nullable
        public S getState() {
            return this.state;
        }

        public final boolean equals(Object o) {
            if (!(o instanceof StateMapEntry)) {
                return false;
            }
            StateEntry e = (StateEntry)o;
            return e.getKey().equals(this.key) && e.getNamespace().equals(this.namespace) && Objects.equals(e.getState(), this.state);
        }

        public final int hashCode() {
            return this.key.hashCode() ^ this.namespace.hashCode() ^ Objects.hashCode(this.state);
        }

        public final String toString() {
            return "(" + String.valueOf(this.key) + "|" + String.valueOf(this.namespace) + ")=" + String.valueOf(this.state);
        }
    }
}

