/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.heap.AbstractHeapMergingState;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.ttl.TtlValue;
import org.apache.flink.util.Preconditions;

class HeapListState<K, N, V>
extends AbstractHeapMergingState<K, N, V, List<V>, Iterable<V>>
implements InternalListState<K, N, V> {
    private HeapListState(StateTable<K, N, List<V>> stateTable, TypeSerializer<K> keySerializer, TypeSerializer<List<V>> valueSerializer, TypeSerializer<N> namespaceSerializer, List<V> defaultValue) {
        super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
    }

    @Override
    public TypeSerializer<K> getKeySerializer() {
        return this.keySerializer;
    }

    @Override
    public TypeSerializer<N> getNamespaceSerializer() {
        return this.namespaceSerializer;
    }

    @Override
    public TypeSerializer<List<V>> getValueSerializer() {
        return this.valueSerializer;
    }

    @Override
    public Iterable<V> get() {
        return (Iterable)this.getInternal();
    }

    @Override
    public void add(V value) {
        Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
        Object namespace = this.currentNamespace;
        StateTable map = this.stateTable;
        ArrayList<V> list = (ArrayList<V>)map.get(namespace);
        if (list == null) {
            list = new ArrayList<V>();
            map.put(namespace, list);
        }
        list.add(value);
    }

    @Override
    public byte[] getSerializedValue(byte[] serializedKeyAndNamespace, TypeSerializer<K> safeKeySerializer, TypeSerializer<N> safeNamespaceSerializer, TypeSerializer<List<V>> safeValueSerializer) throws IOException {
        Preconditions.checkNotNull(serializedKeyAndNamespace);
        Preconditions.checkNotNull(safeKeySerializer);
        Preconditions.checkNotNull(safeNamespaceSerializer);
        Preconditions.checkNotNull(safeValueSerializer);
        Tuple2 keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace((byte[])serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
        List result = (List)this.stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1);
        if (result == null) {
            return null;
        }
        TypeSerializer dupSerializer = ((ListSerializer)safeValueSerializer).getElementSerializer();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos);
        for (int i = 0; i < result.size(); ++i) {
            dupSerializer.serialize(result.get(i), view);
            if (i >= result.size() - 1) continue;
            view.writeByte(44);
        }
        view.flush();
        return baos.toByteArray();
    }

    @Override
    public List<V> migrateTtlValue(List<V> stateValue, TtlAwareSerializer<List<V>, ?> currentTtlAwareSerializer, TtlTimeProvider ttlTimeProvider) {
        if (currentTtlAwareSerializer.isTtlEnabled()) {
            stateValue.replaceAll(v -> new TtlValue<Object>(v, ttlTimeProvider.currentTimestamp()));
        } else {
            stateValue.replaceAll(v -> ((TtlValue)v).getUserValue());
        }
        return stateValue;
    }

    @Override
    protected List<V> mergeState(List<V> a, List<V> b) {
        a.addAll(b);
        return a;
    }

    @Override
    public void update(List<V> values) throws Exception {
        Preconditions.checkNotNull(values, "List of values to add cannot be null.");
        if (values.isEmpty()) {
            this.clear();
            return;
        }
        ArrayList<V> newStateList = new ArrayList<V>();
        for (V v : values) {
            Preconditions.checkNotNull(v, "You cannot add null to a ListState.");
            newStateList.add(v);
        }
        this.stateTable.put(this.currentNamespace, newStateList);
    }

    @Override
    public void addAll(List<V> values) throws Exception {
        Preconditions.checkNotNull(values, "List of values to add cannot be null.");
        if (!values.isEmpty()) {
            this.stateTable.transform(this.currentNamespace, values, (previousState, value) -> {
                if (previousState == null) {
                    previousState = new ArrayList();
                }
                for (Object v : value) {
                    Preconditions.checkNotNull(v, "You cannot add null to a ListState.");
                    previousState.add(v);
                }
                return previousState;
            });
        }
    }

    static <E, K, N, SV, S extends State, IS extends S> IS create(StateDescriptor<S, SV> stateDesc, StateTable<K, N, SV> stateTable, TypeSerializer<K> keySerializer) {
        return (IS)new HeapListState(stateTable, keySerializer, stateTable.getStateSerializer(), stateTable.getNamespaceSerializer(), (List)stateDesc.getDefaultValue());
    }

    static <E, K, N, SV, S extends State, IS extends S> IS update(StateDescriptor<S, SV> stateDesc, StateTable<K, N, SV> stateTable, IS existingState) {
        return (IS)((HeapListState)existingState).setNamespaceSerializer(stateTable.getNamespaceSerializer()).setValueSerializer(stateTable.getStateSerializer()).setDefaultValue((List)stateDesc.getDefaultValue());
    }
}

