/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.NavigableSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.SamzaRunner;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.samza.operators.Scheduler;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SamzaTimerInternalsFactory<K>
implements TimerInternalsFactory<K> {
    private static final Logger LOG = LoggerFactory.getLogger(SamzaTimerInternalsFactory.class);
    private final NavigableSet<KeyedTimerData<K>> eventTimeTimers;
    private final Coder<K> keyCoder;
    private final Scheduler<KeyedTimerData<K>> timerRegistry;
    private final int timerBufferSize;
    private final SamzaTimerState state;
    private final PCollection.IsBounded isBounded;
    private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
    private Instant outputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;

    private SamzaTimerInternalsFactory(Coder<K> keyCoder, Scheduler<KeyedTimerData<K>> timerRegistry, int timerBufferSize, String timerStateId, SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory, Coder<BoundedWindow> windowCoder, PCollection.IsBounded isBounded) {
        this.keyCoder = keyCoder;
        this.timerRegistry = timerRegistry;
        this.timerBufferSize = timerBufferSize;
        this.eventTimeTimers = new TreeSet<KeyedTimerData<K>>();
        this.state = new SamzaTimerState(timerStateId, nonKeyedStateInternalsFactory, windowCoder);
        this.isBounded = isBounded;
    }

    static <K> SamzaTimerInternalsFactory<K> createTimerInternalFactory(Coder<K> keyCoder, Scheduler<KeyedTimerData<K>> timerRegistry, String timerStateId, SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory, WindowingStrategy<?, BoundedWindow> windowingStrategy, PCollection.IsBounded isBounded, SamzaPipelineOptions pipelineOptions) {
        Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
        return new SamzaTimerInternalsFactory<K>(keyCoder, timerRegistry, pipelineOptions.getTimerBufferSize(), timerStateId, nonKeyedStateInternalsFactory, (Coder<BoundedWindow>)windowCoder, isBounded);
    }

    public TimerInternals timerInternalsForKey(@Nullable K key) {
        byte[] keyBytes;
        if (this.keyCoder == null) {
            if (key != null) {
                throw new IllegalArgumentException(String.format("Received non-null key for unkeyed timer factory. Key: %s", key));
            }
            keyBytes = null;
        } else {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            try {
                this.keyCoder.encode(key, (OutputStream)baos);
            }
            catch (IOException e) {
                throw new RuntimeException("Could not encode key: " + key, e);
            }
            keyBytes = baos.toByteArray();
        }
        return new SamzaTimerInternals(keyBytes, key);
    }

    public void setInputWatermark(Instant watermark) {
        if (watermark.isBefore((ReadableInstant)this.inputWatermark)) {
            throw new IllegalArgumentException("New input watermark is before current watermark");
        }
        LOG.debug("Advancing input watermark from {} to {}.", (Object)this.inputWatermark, (Object)watermark);
        this.inputWatermark = watermark;
    }

    public void setOutputWatermark(Instant watermark) {
        if (watermark.isAfter((ReadableInstant)this.inputWatermark)) {
            LOG.debug("Clipping new output watermark from {} to {}.", (Object)watermark, (Object)this.inputWatermark);
            watermark = this.inputWatermark;
        }
        if (watermark.isBefore((ReadableInstant)this.outputWatermark)) {
            throw new IllegalArgumentException("New output watermark is before current watermark");
        }
        LOG.debug("Advancing output watermark from {} to {}.", (Object)this.outputWatermark, (Object)watermark);
        this.outputWatermark = watermark;
    }

    public Collection<KeyedTimerData<K>> removeReadyTimers() {
        ArrayList<KeyedTimerData<K>> readyTimers = new ArrayList<KeyedTimerData<K>>();
        while (!this.eventTimeTimers.isEmpty() && ((KeyedTimerData)this.eventTimeTimers.first()).getTimerData().getTimestamp().isBefore((ReadableInstant)this.inputWatermark)) {
            KeyedTimerData<K> keyedTimerData = this.eventTimeTimers.pollFirst();
            readyTimers.add(keyedTimerData);
            this.state.deletePersisted(keyedTimerData);
            if (!this.eventTimeTimers.isEmpty()) continue;
            this.state.loadEventTimeTimers();
        }
        return readyTimers;
    }

    public void removeProcessingTimer(KeyedTimerData<K> keyedTimerData) {
        this.state.deletePersisted(keyedTimerData);
    }

    public Instant getInputWatermark() {
        return this.inputWatermark;
    }

    public Instant getOutputWatermark() {
        return this.outputWatermark;
    }

    private class SamzaTimerState {
        private final SamzaSetState<KeyedTimerData<K>> eventTimerTimerState;
        private final SamzaSetState<KeyedTimerData<K>> processingTimerTimerState;

        SamzaTimerState(String timerStateId, SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory, Coder<BoundedWindow> windowCoder) {
            this.eventTimerTimerState = (SamzaSetState)nonKeyedStateInternalsFactory.stateInternalsForKey(null).state(StateNamespaces.global(), StateTags.set((String)(timerStateId + "-et"), new KeyedTimerData.KeyedTimerDataCoder(SamzaTimerInternalsFactory.this.keyCoder, windowCoder)));
            this.processingTimerTimerState = (SamzaSetState)nonKeyedStateInternalsFactory.stateInternalsForKey(null).state(StateNamespaces.global(), StateTags.set((String)(timerStateId + "-pt"), new KeyedTimerData.KeyedTimerDataCoder(SamzaTimerInternalsFactory.this.keyCoder, windowCoder)));
            this.restore();
        }

        void persist(KeyedTimerData<K> keyedTimerData) {
            switch (keyedTimerData.getTimerData().getDomain()) {
                case EVENT_TIME: {
                    if (SamzaTimerInternalsFactory.this.eventTimeTimers.contains(keyedTimerData)) break;
                    this.eventTimerTimerState.add(keyedTimerData);
                    break;
                }
                case PROCESSING_TIME: {
                    this.processingTimerTimerState.add(keyedTimerData);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException(String.format("%s currently only supports event time", SamzaRunner.class));
                }
            }
        }

        void deletePersisted(KeyedTimerData<K> keyedTimerData) {
            switch (keyedTimerData.getTimerData().getDomain()) {
                case EVENT_TIME: {
                    this.eventTimerTimerState.remove(keyedTimerData);
                    break;
                }
                case PROCESSING_TIME: {
                    this.processingTimerTimerState.remove(keyedTimerData);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException(String.format("%s currently only supports event time", SamzaRunner.class));
                }
            }
        }

        private void loadEventTimeTimers() {
            if (!((Boolean)this.eventTimerTimerState.isEmpty().read()).booleanValue()) {
                int i;
                Iterator iter = (Iterator)this.eventTimerTimerState.readIterator().read();
                for (i = 0; i < SamzaTimerInternalsFactory.this.timerBufferSize && iter.hasNext(); ++i) {
                    SamzaTimerInternalsFactory.this.eventTimeTimers.add((KeyedTimerData)iter.next());
                }
                LOG.info("Loaded {} event time timers in memory", (Object)i);
                SamzaStoreStateInternals.KeyValueIteratorState iteratorState = (SamzaStoreStateInternals.KeyValueIteratorState)((Object)this.eventTimerTimerState);
                iteratorState.closeIterators();
            }
        }

        private void loadProcessingTimeTimers() {
            if (!((Boolean)this.processingTimerTimerState.isEmpty().read()).booleanValue()) {
                Iterator iter = (Iterator)this.processingTimerTimerState.readIterator().read();
                int count = 0;
                while (iter.hasNext()) {
                    KeyedTimerData keyedTimerData = (KeyedTimerData)iter.next();
                    SamzaTimerInternalsFactory.this.timerRegistry.schedule((Object)keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis());
                    ++count;
                }
                LOG.info("Loaded {} processing time timers in memory", (Object)count);
            }
        }

        private void restore() {
            this.loadEventTimeTimers();
            this.loadProcessingTimeTimers();
        }
    }

    private class SamzaTimerInternals
    implements TimerInternals {
        private final byte[] keyBytes;
        private final K key;

        public SamzaTimerInternals(byte[] keyBytes, K key) {
            this.keyBytes = keyBytes;
            this.key = key;
        }

        public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
            this.setTimer(TimerInternals.TimerData.of((String)timerId, (StateNamespace)namespace, (Instant)target, (TimeDomain)timeDomain));
        }

        public void setTimer(TimerInternals.TimerData timerData) {
            if (SamzaTimerInternalsFactory.this.isBounded == PCollection.IsBounded.UNBOUNDED && timerData.getTimestamp().getMillis() >= GlobalWindow.INSTANCE.maxTimestamp().getMillis()) {
                return;
            }
            KeyedTimerData keyedTimerData = new KeyedTimerData(this.keyBytes, this.key, timerData);
            SamzaTimerInternalsFactory.this.state.persist(keyedTimerData);
            switch (timerData.getDomain()) {
                case EVENT_TIME: {
                    SamzaTimerInternalsFactory.this.eventTimeTimers.add(keyedTimerData);
                    while (SamzaTimerInternalsFactory.this.eventTimeTimers.size() > SamzaTimerInternalsFactory.this.timerBufferSize) {
                        SamzaTimerInternalsFactory.this.eventTimeTimers.pollLast();
                    }
                    break;
                }
                case PROCESSING_TIME: {
                    SamzaTimerInternalsFactory.this.timerRegistry.schedule(keyedTimerData, timerData.getTimestamp().getMillis());
                    break;
                }
                default: {
                    throw new UnsupportedOperationException(String.format("%s currently only supports even time or processing time", SamzaRunner.class));
                }
            }
        }

        public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
            this.deleteTimer(TimerInternals.TimerData.of((String)timerId, (StateNamespace)namespace, (Instant)Instant.now(), (TimeDomain)timeDomain));
        }

        public void deleteTimer(StateNamespace namespace, String timerId) {
            this.deleteTimer(TimerInternals.TimerData.of((String)timerId, (StateNamespace)namespace, (Instant)Instant.now(), (TimeDomain)TimeDomain.EVENT_TIME));
        }

        public void deleteTimer(TimerInternals.TimerData timerData) {
            KeyedTimerData keyedTimerData = new KeyedTimerData(this.keyBytes, this.key, timerData);
            SamzaTimerInternalsFactory.this.state.deletePersisted(keyedTimerData);
            switch (timerData.getDomain()) {
                case EVENT_TIME: {
                    SamzaTimerInternalsFactory.this.eventTimeTimers.remove(keyedTimerData);
                    break;
                }
                case PROCESSING_TIME: {
                    SamzaTimerInternalsFactory.this.timerRegistry.delete(keyedTimerData);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException(String.format("%s currently only supports event time", SamzaRunner.class));
                }
            }
        }

        public Instant currentProcessingTime() {
            return new Instant();
        }

        public Instant currentSynchronizedProcessingTime() {
            throw new UnsupportedOperationException(String.format("%s does not currently support synchronized processing time", SamzaRunner.class));
        }

        public Instant currentInputWatermarkTime() {
            return SamzaTimerInternalsFactory.this.inputWatermark;
        }

        public Instant currentOutputWatermarkTime() {
            return SamzaTimerInternalsFactory.this.outputWatermark;
        }
    }
}

