/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.io.IOException;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.runners.fnexecution.translation.StreamingSideInputHandlerFactory;
import org.apache.beam.runners.fnexecution.wire.ByteStringCoder;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.util.StateUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.samza.context.TaskContext;

public class SamzaStateRequestHandlers {
    public static StateRequestHandler of(String transformId, TaskContext context, SamzaPipelineOptions pipelineOptions, ExecutableStage executableStage, StageBundleFactory stageBundleFactory, Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputIds, SideInputHandler sideInputHandler) {
        StateRequestHandler sideInputStateHandler = SamzaStateRequestHandlers.createSideInputStateHandler(executableStage, sideInputIds, sideInputHandler);
        StateRequestHandler userStateRequestHandler = SamzaStateRequestHandlers.createUserStateRequestHandler(transformId, executableStage, context, pipelineOptions, stageBundleFactory);
        EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler> handlerMap = new EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler>(BeamFnApi.StateKey.TypeCase.class);
        handlerMap.put(BeamFnApi.StateKey.TypeCase.ITERABLE_SIDE_INPUT, sideInputStateHandler);
        handlerMap.put(BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT, sideInputStateHandler);
        handlerMap.put(BeamFnApi.StateKey.TypeCase.MULTIMAP_KEYS_SIDE_INPUT, sideInputStateHandler);
        handlerMap.put(BeamFnApi.StateKey.TypeCase.BAG_USER_STATE, userStateRequestHandler);
        return StateRequestHandlers.delegateBasedUponType(handlerMap);
    }

    private static StateRequestHandler createSideInputStateHandler(ExecutableStage executableStage, Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputIds, SideInputHandler sideInputHandler) {
        if (executableStage.getSideInputs().size() <= 0) {
            return StateRequestHandler.unsupported();
        }
        StateRequestHandlers.SideInputHandlerFactory sideInputHandlerFactory = (StateRequestHandlers.SideInputHandlerFactory)Preconditions.checkNotNull((Object)StreamingSideInputHandlerFactory.forStage((ExecutableStage)executableStage, sideInputIds, (SideInputHandler)sideInputHandler));
        try {
            return StateRequestHandlers.forSideInputHandlerFactory((Map)ProcessBundleDescriptors.getSideInputs((ExecutableStage)executableStage), (StateRequestHandlers.SideInputHandlerFactory)sideInputHandlerFactory);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to initialize SideInputHandler", e);
        }
    }

    private static StateRequestHandler createUserStateRequestHandler(String transformId, ExecutableStage executableStage, TaskContext context, SamzaPipelineOptions pipelineOptions, StageBundleFactory stageBundleFactory) {
        if (!StateUtils.isStateful(executableStage)) {
            return StateRequestHandler.unsupported();
        }
        SamzaStoreStateInternals.Factory stateInternalsFactory = SamzaStoreStateInternals.createStateInternalsFactory(transformId, ByteStringCoder.of(), context, pipelineOptions, executableStage);
        return StateRequestHandlers.forBagUserStateHandlerFactory((ProcessBundleDescriptors.ExecutableProcessBundleDescriptor)stageBundleFactory.getProcessBundleDescriptor(), new BagUserStateFactory(stateInternalsFactory));
    }

    static class BagUserStateFactory<K extends ByteString, V extends ByteString, W extends BoundedWindow>
    implements StateRequestHandlers.BagUserStateHandlerFactory<K, V, W> {
        private final SamzaStoreStateInternals.Factory<K> stateInternalsFactory;

        BagUserStateFactory(SamzaStoreStateInternals.Factory<K> stateInternalsFactory) {
            this.stateInternalsFactory = stateInternalsFactory;
        }

        public StateRequestHandlers.BagUserStateHandler<K, V, W> forUserState(String pTransformId, final String userStateId, Coder<K> keyCoder, final Coder<V> valueCoder, final Coder<W> windowCoder) {
            return new StateRequestHandlers.BagUserStateHandler<K, V, W>(){

                public Iterable<V> get(K key, W window) {
                    StateNamespace namespace = StateNamespaces.window((Coder)windowCoder, window);
                    BagState bagState = (BagState)stateInternalsFactory.stateInternalsForKey(key).state(namespace, StateTags.bag((String)userStateId, (Coder)valueCoder));
                    return bagState.read();
                }

                public void append(K key, W window, Iterator<V> values) {
                    StateNamespace namespace = StateNamespaces.window((Coder)windowCoder, window);
                    BagState bagState = (BagState)stateInternalsFactory.stateInternalsForKey(key).state(namespace, StateTags.bag((String)userStateId, (Coder)valueCoder));
                    while (values.hasNext()) {
                        bagState.add((Object)((ByteString)values.next()));
                    }
                }

                public void clear(K key, W window) {
                    StateNamespace namespace = StateNamespaces.window((Coder)windowCoder, window);
                    BagState bagState = (BagState)stateInternalsFactory.stateInternalsForKey(key).state(namespace, StateTags.bag((String)userStateId, (Coder)valueCoder));
                    bagState.clear();
                }
            };
        }
    }
}

