/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.repackaged.runners.core;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.direct.repackaged.javax.annotation.Nullable;
import org.apache.beam.runners.direct.repackaged.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.direct.repackaged.runners.core.StateInternals;
import org.apache.beam.runners.direct.repackaged.runners.core.StateNamespaces;
import org.apache.beam.runners.direct.repackaged.runners.core.StateTag;
import org.apache.beam.runners.direct.repackaged.runners.core.StateTags;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;

public class SideInputHandler
implements ReadyCheckingSideInputReader {
    protected final Collection<PCollectionView<?>> sideInputs;
    private final StateInternals stateInternals;
    private final Map<PCollectionView<?>, StateTag<CombiningState<BoundedWindow, Set<BoundedWindow>, Set<BoundedWindow>>>> availableWindowsTags;
    private final Map<PCollectionView<?>, StateTag<ValueState<Iterable<WindowedValue<?>>>>> sideInputContentsTags;

    public SideInputHandler(Collection<PCollectionView<?>> sideInputs, StateInternals stateInternals) {
        this.sideInputs = sideInputs;
        this.stateInternals = stateInternals;
        this.availableWindowsTags = new HashMap();
        this.sideInputContentsTags = new HashMap();
        for (PCollectionView<?> sideInput : sideInputs) {
            Coder windowCoder = sideInput.getWindowingStrategyInternal().getWindowFn().windowCoder();
            StateTag<CombiningState<BoundedWindow, Set<BoundedWindow>, Set<BoundedWindow>>> availableTag = StateTags.combiningValue("side-input-available-windows-" + sideInput.getTagInternal().getId(), SetCoder.of((Coder)windowCoder), new WindowSetCombineFn());
            this.availableWindowsTags.put(sideInput, availableTag);
            Coder coder = sideInput.getCoderInternal();
            StateTag stateTag = StateTags.value("side-input-data-" + sideInput.getTagInternal().getId(), coder);
            this.sideInputContentsTags.put(sideInput, stateTag);
        }
    }

    public void addSideInputValue(PCollectionView<?> sideInput, WindowedValue<Iterable<?>> value) {
        Coder windowCoder = sideInput.getWindowingStrategyInternal().getWindowFn().windowCoder();
        ArrayList<WindowedValue> inputWithReifiedWindows = new ArrayList<WindowedValue>();
        for (Object e : (Iterable)value.getValue()) {
            inputWithReifiedWindows.add(value.withValue(e));
        }
        StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag = this.sideInputContentsTags.get(sideInput);
        for (BoundedWindow window : value.getWindows()) {
            this.stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag).write(inputWithReifiedWindows);
            this.stateInternals.state(StateNamespaces.global(), this.availableWindowsTags.get(sideInput)).add((Object)window);
        }
    }

    @Override
    @Nullable
    public <T> T get(PCollectionView<T> sideInput, BoundedWindow window) {
        Coder windowCoder = sideInput.getWindowingStrategyInternal().getWindowFn().windowCoder();
        StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag = this.sideInputContentsTags.get(sideInput);
        ValueState<Iterable<WindowedValue<?>>> state = this.stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag);
        List elements = (List)state.read();
        if (elements == null) {
            elements = Collections.emptyList();
        }
        return (T)sideInput.getViewFn().apply(elements);
    }

    @Override
    public boolean isReady(PCollectionView<?> sideInput, BoundedWindow window) {
        Set readyWindows = (Set)this.stateInternals.state(StateNamespaces.global(), this.availableWindowsTags.get(sideInput)).read();
        boolean result = readyWindows != null && readyWindows.contains(window);
        return result;
    }

    @Override
    public <T> boolean contains(PCollectionView<T> view) {
        return this.sideInputs.contains(view);
    }

    @Override
    public boolean isEmpty() {
        return this.sideInputs.isEmpty();
    }

    private static class WindowSetCombineFn
    extends Combine.CombineFn<BoundedWindow, Set<BoundedWindow>, Set<BoundedWindow>> {
        private WindowSetCombineFn() {
        }

        public Set<BoundedWindow> createAccumulator() {
            return new HashSet<BoundedWindow>();
        }

        public Set<BoundedWindow> addInput(Set<BoundedWindow> accumulator, BoundedWindow input) {
            accumulator.add(input);
            return accumulator;
        }

        public Set<BoundedWindow> mergeAccumulators(Iterable<Set<BoundedWindow>> accumulators) {
            HashSet<BoundedWindow> result = new HashSet<BoundedWindow>();
            for (Set<BoundedWindow> acc : accumulators) {
                result.addAll(acc);
            }
            return result;
        }

        public Set<BoundedWindow> extractOutput(Set<BoundedWindow> accumulator) {
            return accumulator;
        }
    }
}

