/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.repackaged.runners.core;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.beam.runners.direct.repackaged.runners.core.DoFnRunner;
import org.apache.beam.runners.direct.repackaged.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.direct.repackaged.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Instant;

public class SimplePushbackSideInputDoFnRunner<InputT, OutputT>
implements PushbackSideInputDoFnRunner<InputT, OutputT> {
    private final DoFnRunner<InputT, OutputT> underlying;
    private final Collection<PCollectionView<?>> views;
    private final ReadyCheckingSideInputReader sideInputReader;
    private Set<BoundedWindow> notReadyWindows;

    public static <InputT, OutputT> SimplePushbackSideInputDoFnRunner<InputT, OutputT> create(DoFnRunner<InputT, OutputT> underlying, Collection<PCollectionView<?>> views, ReadyCheckingSideInputReader sideInputReader) {
        return new SimplePushbackSideInputDoFnRunner<InputT, OutputT>(underlying, views, sideInputReader);
    }

    private SimplePushbackSideInputDoFnRunner(DoFnRunner<InputT, OutputT> underlying, Collection<PCollectionView<?>> views, ReadyCheckingSideInputReader sideInputReader) {
        this.underlying = underlying;
        this.views = views;
        this.sideInputReader = sideInputReader;
    }

    @Override
    public void startBundle() {
        this.notReadyWindows = new HashSet<BoundedWindow>();
        this.underlying.startBundle();
    }

    @Override
    public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
        if (this.views.isEmpty()) {
            this.underlying.processElement(elem);
            return Collections.emptyList();
        }
        ImmutableList.Builder pushedBack = ImmutableList.builder();
        for (WindowedValue windowElem : elem.explodeWindows()) {
            BoundedWindow mainInputWindow = (BoundedWindow)Iterables.getOnlyElement(windowElem.getWindows());
            if (this.isReady(mainInputWindow)) {
                this.underlying.processElement(windowElem);
                continue;
            }
            this.notReadyWindows.add(mainInputWindow);
            pushedBack.add(windowElem);
        }
        return pushedBack.build();
    }

    private boolean isReady(BoundedWindow mainInputWindow) {
        if (this.notReadyWindows.contains(mainInputWindow)) {
            return false;
        }
        for (PCollectionView<?> view : this.views) {
            BoundedWindow sideInputWindow;
            if (this.sideInputReader.isReady(view, sideInputWindow = view.getWindowMappingFn().getSideInputWindow(mainInputWindow))) continue;
            return false;
        }
        return true;
    }

    @Override
    public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
        this.underlying.onTimer(timerId, window, timestamp, timeDomain);
    }

    @Override
    public void finishBundle() {
        this.notReadyWindows = null;
        this.underlying.finishBundle();
    }
}

