package org.apache.flink.state.changelog;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.FlinkRuntimeException;

/* loaded from: input_file:org/apache/flink/state/changelog/ChangelogStateFactory.class */
public class ChangelogStateFactory {
    private final Map<String, ChangelogState> changelogStates = new HashMap();
    private final Map<String, ChangelogKeyGroupedPriorityQueue<?>> priorityQueueStatesByName = new HashMap();
    private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES = (Map) Stream.of((Object[]) new Tuple2[]{Tuple2.of(StateDescriptor.Type.VALUE, ChangelogValueState::create), Tuple2.of(StateDescriptor.Type.LIST, ChangelogListState::create), Tuple2.of(StateDescriptor.Type.REDUCING, ChangelogReducingState::create), Tuple2.of(StateDescriptor.Type.AGGREGATING, ChangelogAggregatingState::create), Tuple2.of(StateDescriptor.Type.MAP, ChangelogMapState::create)}).collect(Collectors.toMap(tuple2 -> {
        return (StateDescriptor.Type) tuple2.f0;
    }, tuple22 -> {
        return (StateFactory) tuple22.f1;
    }));

    /* renamed from: org.apache.flink.state.changelog.ChangelogStateFactory$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/state/changelog/ChangelogStateFactory$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$state$metainfo$StateMetaInfoSnapshot$BackendStateType = new int[StateMetaInfoSnapshot.BackendStateType.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$state$metainfo$StateMetaInfoSnapshot$BackendStateType[StateMetaInfoSnapshot.BackendStateType.KEY_VALUE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$state$metainfo$StateMetaInfoSnapshot$BackendStateType[StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/changelog/ChangelogStateFactory$StateFactory.class */
    public interface StateFactory {
        /* JADX WARN: Incorrect return type in method signature: <K:Ljava/lang/Object;N:Ljava/lang/Object;SV:Ljava/lang/Object;S::Lorg/apache/flink/api/common/state/State;IS:TS;>(Lorg/apache/flink/runtime/state/internal/InternalKvState<TK;TN;TSV;>;Lorg/apache/flink/state/changelog/KvStateChangeLogger<TSV;TN;>;Lorg/apache/flink/runtime/state/InternalKeyContext<TK;>;)TIS; */
        State create(InternalKvState internalKvState, KvStateChangeLogger kvStateChangeLogger, InternalKeyContext internalKeyContext) throws Exception;
    }

    public <K, N, V, S extends State> ChangelogState create(StateDescriptor<S, V> stateDescriptor, InternalKvState<K, N, V> internalKvState, KvStateChangeLogger<V, N> kvStateChangeLogger, InternalKeyContext<K> internalKeyContext) throws Exception {
        ChangelogState create = getStateFactory(stateDescriptor).create(internalKvState, kvStateChangeLogger, internalKeyContext);
        this.changelogStates.put(stateDescriptor.getName(), create);
        return create;
    }

    public <T> ChangelogKeyGroupedPriorityQueue<T> create(String str, KeyGroupedInternalPriorityQueue<T> keyGroupedInternalPriorityQueue, StateChangeLogger<T, Void> stateChangeLogger, TypeSerializer<T> typeSerializer) {
        ChangelogKeyGroupedPriorityQueue<T> changelogKeyGroupedPriorityQueue = new ChangelogKeyGroupedPriorityQueue<>(keyGroupedInternalPriorityQueue, stateChangeLogger, typeSerializer);
        this.priorityQueueStatesByName.put(str, changelogKeyGroupedPriorityQueue);
        return changelogKeyGroupedPriorityQueue;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v11, types: [org.apache.flink.state.changelog.ChangelogState] */
    public ChangelogState getExistingState(String str, StateMetaInfoSnapshot.BackendStateType backendStateType) throws UnsupportedOperationException {
        ChangelogKeyGroupedPriorityQueue<?> changelogKeyGroupedPriorityQueue;
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$state$metainfo$StateMetaInfoSnapshot$BackendStateType[backendStateType.ordinal()]) {
            case 1:
                changelogKeyGroupedPriorityQueue = this.changelogStates.get(str);
                break;
            case 2:
                changelogKeyGroupedPriorityQueue = this.priorityQueueStatesByName.get(str);
                break;
            default:
                throw new UnsupportedOperationException(String.format("Unknown state type %s (%s)", backendStateType, str));
        }
        return changelogKeyGroupedPriorityQueue;
    }

    public void resetAllWritingMetaFlags() {
        Iterator<ChangelogState> it = this.changelogStates.values().iterator();
        while (it.hasNext()) {
            it.next().resetWritingMetaFlag();
        }
        Iterator<ChangelogKeyGroupedPriorityQueue<?>> it2 = this.priorityQueueStatesByName.values().iterator();
        while (it2.hasNext()) {
            it2.next().resetWritingMetaFlag();
        }
    }

    public void dispose() {
        this.changelogStates.clear();
        this.priorityQueueStatesByName.clear();
    }

    private <S extends State, V> StateFactory getStateFactory(StateDescriptor<S, V> stateDescriptor) {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDescriptor.getType());
        if (stateFactory == null) {
            throw new FlinkRuntimeException(String.format("State %s is not supported by %s", stateDescriptor.getClass(), ChangelogKeyedStateBackend.class));
        }
        return stateFactory;
    }
}
