/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.SamzaRunner;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.state.SamzaMapState;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.samza.operators.Scheduler;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SamzaTimerInternalsFactory<K>
implements TimerInternalsFactory<K> {
    private static final Logger LOG = LoggerFactory.getLogger(SamzaTimerInternalsFactory.class);
    private final NavigableSet<KeyedTimerData<K>> eventTimeTimers;
    private final Coder<K> keyCoder;
    private final Scheduler<KeyedTimerData<K>> timerRegistry;
    private final SamzaTimerState state;
    private final PCollection.IsBounded isBounded;
    private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
    private Instant outputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;

    private SamzaTimerInternalsFactory(Coder<K> keyCoder, Scheduler<KeyedTimerData<K>> timerRegistry, String timerStateId, SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory, Coder<BoundedWindow> windowCoder, PCollection.IsBounded isBounded) {
        this.keyCoder = keyCoder;
        this.timerRegistry = timerRegistry;
        this.eventTimeTimers = new TreeSet<KeyedTimerData<K>>();
        this.state = new SamzaTimerState(timerStateId, nonKeyedStateInternalsFactory, windowCoder);
        this.isBounded = isBounded;
    }

    static <K> SamzaTimerInternalsFactory<K> createTimerInternalFactory(Coder<K> keyCoder, Scheduler<KeyedTimerData<K>> timerRegistry, String timerStateId, SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory, WindowingStrategy<?, BoundedWindow> windowingStrategy, PCollection.IsBounded isBounded, SamzaPipelineOptions pipelineOptions) {
        Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
        return new SamzaTimerInternalsFactory<K>(keyCoder, timerRegistry, timerStateId, nonKeyedStateInternalsFactory, (Coder<BoundedWindow>)windowCoder, isBounded);
    }

    public TimerInternals timerInternalsForKey(@Nullable K key) {
        byte[] keyBytes;
        if (this.keyCoder == null) {
            if (key != null) {
                throw new IllegalArgumentException(String.format("Received non-null key for unkeyed timer factory. Key: %s", key));
            }
            keyBytes = null;
        } else {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            try {
                this.keyCoder.encode(key, (OutputStream)baos);
            }
            catch (IOException e) {
                throw new RuntimeException("Could not encode key: " + key, e);
            }
            keyBytes = baos.toByteArray();
        }
        return new SamzaTimerInternals(keyBytes, key);
    }

    public void setInputWatermark(Instant watermark) {
        if (watermark.isBefore((ReadableInstant)this.inputWatermark)) {
            throw new IllegalArgumentException("New input watermark is before current watermark");
        }
        LOG.debug("Advancing input watermark from {} to {}.", (Object)this.inputWatermark, (Object)watermark);
        this.inputWatermark = watermark;
    }

    public void setOutputWatermark(Instant watermark) {
        if (watermark.isAfter((ReadableInstant)this.inputWatermark)) {
            LOG.debug("Clipping new output watermark from {} to {}.", (Object)watermark, (Object)this.inputWatermark);
            watermark = this.inputWatermark;
        }
        if (watermark.isBefore((ReadableInstant)this.outputWatermark)) {
            throw new IllegalArgumentException("New output watermark is before current watermark");
        }
        LOG.debug("Advancing output watermark from {} to {}.", (Object)this.outputWatermark, (Object)watermark);
        this.outputWatermark = watermark;
    }

    public Collection<KeyedTimerData<K>> removeReadyTimers() {
        ArrayList<KeyedTimerData<K>> readyTimers = new ArrayList<KeyedTimerData<K>>();
        while (!this.eventTimeTimers.isEmpty() && ((KeyedTimerData)this.eventTimeTimers.first()).getTimerData().getTimestamp().isBefore((ReadableInstant)this.inputWatermark)) {
            KeyedTimerData<K> keyedTimerData = this.eventTimeTimers.pollFirst();
            readyTimers.add(keyedTimerData);
            this.state.deletePersisted(keyedTimerData);
        }
        return readyTimers;
    }

    public void removeProcessingTimer(KeyedTimerData<K> keyedTimerData) {
        this.state.deletePersisted(keyedTimerData);
    }

    public Instant getInputWatermark() {
        return this.inputWatermark;
    }

    public Instant getOutputWatermark() {
        return this.outputWatermark;
    }

    public static class TimerKeyCoder<K>
    extends StructuredCoder<TimerKey<K>> {
        private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
        private final Coder<K> keyCoder;
        private final Coder<? extends BoundedWindow> windowCoder;

        TimerKeyCoder(Coder<K> keyCoder, Coder<? extends BoundedWindow> windowCoder) {
            this.keyCoder = keyCoder;
            this.windowCoder = windowCoder;
        }

        public void encode(TimerKey<K> value, OutputStream outStream) throws CoderException, IOException {
            STRING_CODER.encode(((TimerKey)value).timerId, outStream);
            STRING_CODER.encode(((TimerKey)value).stateNamespace.stringKey(), outStream);
            if (this.keyCoder != null) {
                this.keyCoder.encode(((TimerKey)value).key, outStream);
            }
        }

        public TimerKey<K> decode(InputStream inStream) throws CoderException, IOException {
            String timerId = STRING_CODER.decode(inStream);
            StateNamespace namespace = StateNamespaces.fromString((String)STRING_CODER.decode(inStream), this.windowCoder);
            Object key = null;
            if (this.keyCoder != null) {
                key = this.keyCoder.decode(inStream);
            }
            return new TimerKey(key, namespace, timerId);
        }

        public List<? extends Coder<?>> getCoderArguments() {
            return Arrays.asList(this.keyCoder, this.windowCoder);
        }

        public void verifyDeterministic() throws Coder.NonDeterministicException {
        }
    }

    private static class TimerKey<K> {
        private final K key;
        private final StateNamespace stateNamespace;
        private final String timerId;

        static <K> TimerKey<K> of(KeyedTimerData<K> keyedTimerData) {
            TimerInternals.TimerData timerData = keyedTimerData.getTimerData();
            return new TimerKey<K>(keyedTimerData.getKey(), timerData.getNamespace(), timerData.getTimerId());
        }

        static <K> KeyedTimerData<K> toKeyedTimerData(TimerKey<K> timerKey, long timestamp, TimeDomain domain, Coder<K> keyCoder) {
            byte[] keyBytes = null;
            if (keyCoder != null && timerKey.key != null) {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                try {
                    keyCoder.encode(timerKey.key, (OutputStream)baos);
                }
                catch (IOException e) {
                    throw new RuntimeException("Could not encode key: " + timerKey.key, e);
                }
                keyBytes = baos.toByteArray();
            }
            return new KeyedTimerData<K>(keyBytes, timerKey.key, TimerInternals.TimerData.of((String)timerKey.timerId, (StateNamespace)timerKey.stateNamespace, (Instant)new Instant(timestamp), (TimeDomain)domain));
        }

        private TimerKey(K key, StateNamespace stateNamespace, String timerId) {
            this.key = key;
            this.stateNamespace = stateNamespace;
            this.timerId = timerId;
        }

        public K getKey() {
            return this.key;
        }

        public StateNamespace getStateNamespace() {
            return this.stateNamespace;
        }

        public String getTimerId() {
            return this.timerId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TimerKey timerKey = (TimerKey)o;
            if (this.key != null ? !this.key.equals(timerKey.key) : timerKey.key != null) {
                return false;
            }
            if (!this.stateNamespace.equals(timerKey.stateNamespace)) {
                return false;
            }
            return this.timerId.equals(timerKey.timerId);
        }

        public int hashCode() {
            int result = this.key != null ? this.key.hashCode() : 0;
            result = 31 * result + this.stateNamespace.hashCode();
            result = 31 * result + this.timerId.hashCode();
            return result;
        }

        public String toString() {
            return "TimerKey{key=" + this.key + ", stateNamespace=" + this.stateNamespace + ", timerId='" + this.timerId + '\'' + '}';
        }
    }

    private class SamzaTimerState {
        private final SamzaMapState<TimerKey<K>, Long> eventTimerTimerState;
        private final SamzaMapState<TimerKey<K>, Long> processingTimerTimerState;

        SamzaTimerState(String timerStateId, SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory, Coder<BoundedWindow> windowCoder) {
            this.eventTimerTimerState = (SamzaMapState)nonKeyedStateInternalsFactory.stateInternalsForKey(null).state(StateNamespaces.global(), StateTags.map((String)(timerStateId + "-et"), new TimerKeyCoder(SamzaTimerInternalsFactory.this.keyCoder, windowCoder), (Coder)VarLongCoder.of()));
            this.processingTimerTimerState = (SamzaMapState)nonKeyedStateInternalsFactory.stateInternalsForKey(null).state(StateNamespaces.global(), StateTags.map((String)(timerStateId + "-pt"), new TimerKeyCoder(SamzaTimerInternalsFactory.this.keyCoder, windowCoder), (Coder)VarLongCoder.of()));
            this.restore();
        }

        Long get(KeyedTimerData<K> keyedTimerData) {
            TimerKey timerKey = TimerKey.of(keyedTimerData);
            switch (keyedTimerData.getTimerData().getDomain()) {
                case EVENT_TIME: {
                    return (Long)this.eventTimerTimerState.get(timerKey).read();
                }
                case PROCESSING_TIME: {
                    return (Long)this.processingTimerTimerState.get(timerKey).read();
                }
            }
            throw new UnsupportedOperationException(String.format("%s currently only supports event time", SamzaRunner.class));
        }

        void persist(KeyedTimerData<K> keyedTimerData) {
            TimerKey timerKey = TimerKey.of(keyedTimerData);
            switch (keyedTimerData.getTimerData().getDomain()) {
                case EVENT_TIME: {
                    this.eventTimerTimerState.put(timerKey, keyedTimerData.getTimerData().getTimestamp().getMillis());
                    break;
                }
                case PROCESSING_TIME: {
                    this.processingTimerTimerState.put(timerKey, keyedTimerData.getTimerData().getTimestamp().getMillis());
                    break;
                }
                default: {
                    throw new UnsupportedOperationException(String.format("%s currently only supports event time", SamzaRunner.class));
                }
            }
        }

        void deletePersisted(KeyedTimerData<K> keyedTimerData) {
            TimerKey timerKey = TimerKey.of(keyedTimerData);
            switch (keyedTimerData.getTimerData().getDomain()) {
                case EVENT_TIME: {
                    this.eventTimerTimerState.remove(timerKey);
                    break;
                }
                case PROCESSING_TIME: {
                    this.processingTimerTimerState.remove(timerKey);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException(String.format("%s currently only supports event time", SamzaRunner.class));
                }
            }
        }

        private void loadEventTimeTimers() {
            Iterator iter = (Iterator)this.eventTimerTimerState.readIterator().read();
            while (iter.hasNext()) {
                Map.Entry entry = (Map.Entry)iter.next();
                KeyedTimerData keyedTimerData = TimerKey.toKeyedTimerData((TimerKey)entry.getKey(), (Long)entry.getValue(), TimeDomain.EVENT_TIME, SamzaTimerInternalsFactory.this.keyCoder);
                SamzaTimerInternalsFactory.this.eventTimeTimers.add(keyedTimerData);
            }
            LOG.info("Loaded {} event time timers in memory", (Object)SamzaTimerInternalsFactory.this.eventTimeTimers.size());
        }

        private void loadProcessingTimeTimers() {
            Iterator iter = (Iterator)this.processingTimerTimerState.readIterator().read();
            int count = 0;
            while (iter.hasNext()) {
                Map.Entry entry = (Map.Entry)iter.next();
                KeyedTimerData keyedTimerData = TimerKey.toKeyedTimerData((TimerKey)entry.getKey(), (Long)entry.getValue(), TimeDomain.PROCESSING_TIME, SamzaTimerInternalsFactory.this.keyCoder);
                SamzaTimerInternalsFactory.this.timerRegistry.schedule(keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis());
                ++count;
            }
            LOG.info("Loaded {} processing time timers in memory", (Object)count);
        }

        private void restore() {
            this.loadEventTimeTimers();
            this.loadProcessingTimeTimers();
        }
    }

    private class SamzaTimerInternals
    implements TimerInternals {
        private final byte[] keyBytes;
        private final K key;

        public SamzaTimerInternals(byte[] keyBytes, K key) {
            this.keyBytes = keyBytes;
            this.key = key;
        }

        public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
            this.setTimer(TimerInternals.TimerData.of((String)timerId, (StateNamespace)namespace, (Instant)target, (TimeDomain)timeDomain));
        }

        public void setTimer(TimerInternals.TimerData timerData) {
            if (SamzaTimerInternalsFactory.this.isBounded == PCollection.IsBounded.UNBOUNDED && timerData.getTimestamp().getMillis() >= GlobalWindow.INSTANCE.maxTimestamp().getMillis()) {
                return;
            }
            KeyedTimerData keyedTimerData = new KeyedTimerData(this.keyBytes, this.key, timerData);
            if (SamzaTimerInternalsFactory.this.eventTimeTimers.contains(keyedTimerData)) {
                return;
            }
            Long lastTimestamp = SamzaTimerInternalsFactory.this.state.get(keyedTimerData);
            Long newTimestamp = timerData.getTimestamp().getMillis();
            if (!newTimestamp.equals(lastTimestamp)) {
                if (lastTimestamp != null) {
                    TimerInternals.TimerData lastTimerData = TimerInternals.TimerData.of((String)timerData.getTimerId(), (StateNamespace)timerData.getNamespace(), (Instant)new Instant((Object)lastTimestamp), (TimeDomain)timerData.getDomain());
                    this.deleteTimer(lastTimerData, false);
                }
                SamzaTimerInternalsFactory.this.state.persist(keyedTimerData);
                switch (timerData.getDomain()) {
                    case EVENT_TIME: {
                        SamzaTimerInternalsFactory.this.eventTimeTimers.add(keyedTimerData);
                        break;
                    }
                    case PROCESSING_TIME: {
                        SamzaTimerInternalsFactory.this.timerRegistry.schedule(keyedTimerData, timerData.getTimestamp().getMillis());
                        break;
                    }
                    default: {
                        throw new UnsupportedOperationException(String.format("%s currently only supports even time or processing time", SamzaRunner.class));
                    }
                }
            }
        }

        public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
            this.deleteTimer(TimerInternals.TimerData.of((String)timerId, (StateNamespace)namespace, (Instant)Instant.now(), (TimeDomain)timeDomain));
        }

        public void deleteTimer(StateNamespace namespace, String timerId) {
            this.deleteTimer(TimerInternals.TimerData.of((String)timerId, (StateNamespace)namespace, (Instant)Instant.now(), (TimeDomain)TimeDomain.EVENT_TIME));
        }

        public void deleteTimer(TimerInternals.TimerData timerData) {
            this.deleteTimer(timerData, true);
        }

        private void deleteTimer(TimerInternals.TimerData timerData, boolean updateState) {
            KeyedTimerData keyedTimerData = new KeyedTimerData(this.keyBytes, this.key, timerData);
            if (updateState) {
                SamzaTimerInternalsFactory.this.state.deletePersisted(keyedTimerData);
            }
            switch (timerData.getDomain()) {
                case EVENT_TIME: {
                    SamzaTimerInternalsFactory.this.eventTimeTimers.remove(keyedTimerData);
                    break;
                }
                case PROCESSING_TIME: {
                    SamzaTimerInternalsFactory.this.timerRegistry.delete(keyedTimerData);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException(String.format("%s currently only supports event time", SamzaRunner.class));
                }
            }
        }

        public Instant currentProcessingTime() {
            return new Instant();
        }

        public Instant currentSynchronizedProcessingTime() {
            throw new UnsupportedOperationException(String.format("%s does not currently support synchronized processing time", SamzaRunner.class));
        }

        public Instant currentInputWatermarkTime() {
            return SamzaTimerInternalsFactory.this.inputWatermark;
        }

        public Instant currentOutputWatermarkTime() {
            return SamzaTimerInternalsFactory.this.outputWatermark;
        }
    }
}

