/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.ContextualRecord;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import org.apache.kafka.streams.state.internals.metrics.Sensors;

public class InMemoryTimeOrderedKeyValueBuffer
implements TimeOrderedKeyValueBuffer {
    private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
    private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
    private final Map<Bytes, BufferKey> index = new HashMap<Bytes, BufferKey>();
    private final TreeMap<BufferKey, ContextualRecord> sortedMap = new TreeMap();
    private final Set<Bytes> dirtyKeys = new HashSet<Bytes>();
    private final String storeName;
    private final boolean loggingEnabled;
    private long memBufferSize = 0L;
    private long minTimestamp = Long.MAX_VALUE;
    private RecordCollector collector;
    private String changelogTopic;
    private Sensor bufferSizeSensor;
    private Sensor bufferCountSensor;
    private volatile boolean open;

    private InMemoryTimeOrderedKeyValueBuffer(String storeName, boolean loggingEnabled) {
        this.storeName = storeName;
        this.loggingEnabled = loggingEnabled;
    }

    @Override
    public String name() {
        return this.storeName;
    }

    @Override
    public boolean persistent() {
        return false;
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        InternalProcessorContext internalProcessorContext = (InternalProcessorContext)context;
        this.bufferSizeSensor = Sensors.createBufferSizeSensor(this, internalProcessorContext);
        this.bufferCountSensor = Sensors.createBufferCountSensor(this, internalProcessorContext);
        context.register(root, this::restoreBatch);
        if (this.loggingEnabled) {
            this.collector = ((RecordCollector.Supplier)((Object)context)).recordCollector();
            this.changelogTopic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), this.storeName);
        }
        this.updateBufferMetrics();
        this.open = true;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    @Override
    public void close() {
        this.open = false;
        this.index.clear();
        this.sortedMap.clear();
        this.dirtyKeys.clear();
        this.memBufferSize = 0L;
        this.minTimestamp = Long.MAX_VALUE;
        this.updateBufferMetrics();
    }

    @Override
    public void flush() {
        if (this.loggingEnabled) {
            for (Bytes key : this.dirtyKeys) {
                BufferKey bufferKey = this.index.get(key);
                if (bufferKey == null) {
                    this.collector.send(this.changelogTopic, key, null, (Headers)null, (Integer)null, (Long)null, KEY_SERIALIZER, VALUE_SERIALIZER);
                    continue;
                }
                ContextualRecord value = this.sortedMap.get(bufferKey);
                byte[] innerValue = value.value();
                byte[] timeAndValue = ByteBuffer.wrap(new byte[8 + innerValue.length]).putLong(bufferKey.time).put(innerValue).array();
                ProcessorRecordContext recordContext = value.recordContext();
                this.collector.send(this.changelogTopic, key, timeAndValue, recordContext.headers(), recordContext.partition(), recordContext.timestamp(), KEY_SERIALIZER, VALUE_SERIALIZER);
            }
            this.dirtyKeys.clear();
        }
    }

    private void restoreBatch(Collection<ConsumerRecord<byte[], byte[]>> batch) {
        for (ConsumerRecord<byte[], byte[]> record : batch) {
            Bytes key = Bytes.wrap((byte[])((byte[])record.key()));
            if (record.value() == null) {
                BufferKey bufferKey = this.index.remove(key);
                if (bufferKey == null) continue;
                this.sortedMap.remove(bufferKey);
                continue;
            }
            ByteBuffer timeAndValue = ByteBuffer.wrap((byte[])record.value());
            long time = timeAndValue.getLong();
            byte[] value = new byte[((byte[])record.value()).length - 8];
            timeAndValue.get(value);
            this.cleanPut(time, key, new ContextualRecord(value, new ProcessorRecordContext(record.timestamp(), record.offset(), record.partition(), record.topic(), record.headers())));
        }
        this.updateBufferMetrics();
    }

    @Override
    public void evictWhile(Supplier<Boolean> predicate, Consumer<KeyValue<Bytes, ContextualRecord>> callback) {
        Iterator<Map.Entry<BufferKey, ContextualRecord>> delegate = this.sortedMap.entrySet().iterator();
        int evictions = 0;
        if (predicate.get().booleanValue()) {
            Map.Entry<BufferKey, ContextualRecord> next = null;
            if (delegate.hasNext()) {
                next = delegate.next();
            }
            while (next != null && predicate.get().booleanValue()) {
                callback.accept(new KeyValue<Bytes, ContextualRecord>(next.getKey().key, next.getValue()));
                delegate.remove();
                this.index.remove(next.getKey().key);
                this.dirtyKeys.add(next.getKey().key);
                this.memBufferSize -= this.computeRecordSize(next.getKey().key, next.getValue());
                if (delegate.hasNext()) {
                    next = delegate.next();
                    this.minTimestamp = next == null ? Long.MAX_VALUE : next.getKey().time;
                } else {
                    next = null;
                    this.minTimestamp = Long.MAX_VALUE;
                }
                ++evictions;
            }
        }
        if (evictions > 0) {
            this.updateBufferMetrics();
        }
    }

    @Override
    public void put(long time, Bytes key, ContextualRecord value) {
        this.cleanPut(time, key, value);
        this.dirtyKeys.add(key);
        this.updateBufferMetrics();
    }

    private void cleanPut(long time, Bytes key, ContextualRecord value) {
        BufferKey previousKey = this.index.get(key);
        if (previousKey == null) {
            BufferKey nextKey = new BufferKey(time, key);
            this.index.put(key, nextKey);
            this.sortedMap.put(nextKey, value);
            this.minTimestamp = Math.min(this.minTimestamp, time);
            this.memBufferSize += this.computeRecordSize(key, value);
        } else {
            ContextualRecord removedValue = this.sortedMap.put(previousKey, value);
            this.memBufferSize = this.memBufferSize + this.computeRecordSize(key, value) - (removedValue == null ? 0L : this.computeRecordSize(key, removedValue));
        }
    }

    @Override
    public int numRecords() {
        return this.index.size();
    }

    @Override
    public long bufferSize() {
        return this.memBufferSize;
    }

    @Override
    public long minTimestamp() {
        return this.minTimestamp;
    }

    private long computeRecordSize(Bytes key, ContextualRecord value) {
        long size = 0L;
        size += 8L;
        size += (long)key.get().length;
        if (value != null) {
            size += value.sizeBytes();
        }
        return size;
    }

    private void updateBufferMetrics() {
        this.bufferSizeSensor.record((double)this.memBufferSize);
        this.bufferCountSensor.record((double)this.index.size());
    }

    private static class BufferKey
    implements Comparable<BufferKey> {
        private final long time;
        private final Bytes key;

        private BufferKey(long time, Bytes key) {
            this.time = time;
            this.key = key;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            BufferKey bufferKey = (BufferKey)o;
            return this.time == bufferKey.time && Objects.equals(this.key, bufferKey.key);
        }

        public int hashCode() {
            return Objects.hash(this.time, this.key);
        }

        @Override
        public int compareTo(BufferKey o) {
            int timeComparison = Long.compare(this.time, o.time);
            return timeComparison == 0 ? this.key.compareTo(o.key) : timeComparison;
        }
    }

    public static class Builder
    implements StoreBuilder<StateStore> {
        private final String storeName;
        private boolean loggingEnabled = true;

        public Builder(String storeName) {
            this.storeName = storeName;
        }

        @Override
        public StoreBuilder<StateStore> withCachingEnabled() {
            return this;
        }

        @Override
        public StoreBuilder<StateStore> withCachingDisabled() {
            return this;
        }

        @Override
        public StoreBuilder<StateStore> withLoggingEnabled(Map<String, String> config) {
            throw new UnsupportedOperationException();
        }

        @Override
        public StoreBuilder<StateStore> withLoggingDisabled() {
            this.loggingEnabled = false;
            return this;
        }

        @Override
        public StateStore build() {
            return new InMemoryTimeOrderedKeyValueBuffer(this.storeName, this.loggingEnabled);
        }

        @Override
        public Map<String, String> logConfig() {
            return Collections.emptyMap();
        }

        @Override
        public boolean loggingEnabled() {
            return this.loggingEnabled;
        }

        @Override
        public String name() {
            return this.storeName;
        }
    }
}

