/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.util.ArrayList;
import java.util.List;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.runners.samza.state.SamzaMapState;
import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

@ThreadSafe
class KeyedInternals<K> {
    private static final ThreadLocal<KeyedStates> threadLocalKeyedStates = new ThreadLocal();
    private final StateInternalsFactory<K> stateFactory;
    private final TimerInternalsFactory<K> timerFactory;

    KeyedInternals(StateInternalsFactory<K> stateFactory, TimerInternalsFactory<K> timerFactory) {
        this.stateFactory = stateFactory;
        this.timerFactory = timerFactory;
    }

    StateInternals stateInternals() {
        return new KeyedStateInternals();
    }

    TimerInternals timerInternals() {
        return new KeyedTimerInternals();
    }

    void setKey(K key) {
        Preconditions.checkState((threadLocalKeyedStates.get() == null ? 1 : 0) != 0, (String)"States for key %s is not cleared before processing", key);
        threadLocalKeyedStates.set(new KeyedStates(key));
    }

    K getKey() {
        KeyedStates keyedStates = threadLocalKeyedStates.get();
        return (K)(keyedStates == null ? null : keyedStates.key);
    }

    void clearKey() {
        List states = threadLocalKeyedStates.get().states;
        states.forEach(state -> {
            if (state instanceof SamzaMapState) {
                ((SamzaMapState)state).closeIterators();
            } else if (state instanceof SamzaSetState) {
                ((SamzaSetState)state).closeIterators();
            }
        });
        states.clear();
        threadLocalKeyedStates.remove();
    }

    private class KeyedTimerInternals
    implements TimerInternals {
        private KeyedTimerInternals() {
        }

        private TimerInternals getInternals() {
            return KeyedInternals.this.timerFactory.timerInternalsForKey(KeyedInternals.this.getKey());
        }

        public void setTimer(StateNamespace namespace, String timerId, String timerFamilyId, Instant target, Instant outputTimestamp, TimeDomain timeDomain) {
            this.getInternals().setTimer(namespace, timerId, timerFamilyId, target, outputTimestamp, timeDomain);
        }

        public void setTimer(TimerInternals.TimerData timerData) {
            this.getInternals().setTimer(timerData);
        }

        public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) {
            this.getInternals().deleteTimer(namespace, timerId, timerFamilyId, timeDomain);
        }

        public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
            this.getInternals().deleteTimer(namespace, timerId, timerFamilyId);
        }

        public void deleteTimer(TimerInternals.TimerData timerKey) {
            this.getInternals().deleteTimer(timerKey);
        }

        public Instant currentProcessingTime() {
            return this.getInternals().currentProcessingTime();
        }

        public @Nullable Instant currentSynchronizedProcessingTime() {
            return this.getInternals().currentSynchronizedProcessingTime();
        }

        public Instant currentInputWatermarkTime() {
            return this.getInternals().currentInputWatermarkTime();
        }

        public @Nullable Instant currentOutputWatermarkTime() {
            return this.getInternals().currentOutputWatermarkTime();
        }
    }

    private class KeyedStateInternals
    implements StateInternals {
        private KeyedStateInternals() {
        }

        public K getKey() {
            return KeyedInternals.this.getKey();
        }

        public <T extends State> T state(StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
            Preconditions.checkState((this.getKey() != null ? 1 : 0) != 0, (Object)"Key is not set before state access in Stateful ParDo.");
            State state = KeyedInternals.this.stateFactory.stateInternalsForKey(this.getKey()).state(namespace, address, c);
            ((KeyedStates)threadLocalKeyedStates.get()).states.add(state);
            return (T)state;
        }
    }

    private static class KeyedStates<K> {
        private final K key;
        private final List<State> states;

        private KeyedStates(K key) {
            this.key = key;
            this.states = new ArrayList<State>();
        }
    }
}

