/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.offsets;

import io.confluent.csid.utils.Range;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import io.confluent.parallelconsumer.offsets.EncodedOffsetPair;
import io.confluent.parallelconsumer.offsets.EncodingNotSupportedException;
import io.confluent.parallelconsumer.offsets.OffsetDecodingError;
import io.confluent.parallelconsumer.offsets.OffsetEncoding;
import io.confluent.parallelconsumer.offsets.OffsetSimpleSerialisation;
import io.confluent.parallelconsumer.offsets.OffsetSimultaneousEncoder;
import io.confluent.parallelconsumer.state.PartitionState;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OffsetMapCodecManager<K, V> {
    private static final Logger log = LoggerFactory.getLogger(OffsetMapCodecManager.class);
    public static final String METADATA_DATA_SIZE_RESOURCE_LOCK = "Value doesn't matter, just needs a constant";
    public static int DefaultMaxMetadataSize = 4096;
    public static final Charset CHARSET_TO_USE = StandardCharsets.UTF_8;
    Consumer<K, V> consumer;
    public static Optional<OffsetEncoding> forcedCodec = Optional.empty();

    public OffsetMapCodecManager(Consumer<K, V> consumer) {
        this.consumer = consumer;
    }

    public Map<TopicPartition, PartitionState<K, V>> loadOffsetMapForPartition(Set<TopicPartition> assignment) {
        Map lastCommittedOffsets = null;
        int attempts = 0;
        while (lastCommittedOffsets == null) {
            WakeupException lastWakeupException = null;
            try {
                lastCommittedOffsets = this.consumer.committed(assignment);
            }
            catch (WakeupException exception) {
                log.warn("Woken up trying to get assignment", (Throwable)exception);
                lastWakeupException = exception;
            }
            if (++attempts <= 10) continue;
            throw new InternalRuntimeError("Failed to get partition assignment - continuously woken up.", lastWakeupException);
        }
        HashMap states = new HashMap();
        lastCommittedOffsets.forEach((tp, offsetAndMeta) -> {
            if (offsetAndMeta != null) {
                long nextExpectedOffset = offsetAndMeta.offset();
                String metadata = offsetAndMeta.metadata();
                try {
                    PartitionState<K, V> incompletes = this.decodeIncompletes(nextExpectedOffset, (TopicPartition)tp, metadata);
                    states.put((TopicPartition)tp, incompletes);
                }
                catch (OffsetDecodingError offsetDecodingError) {
                    log.error("Error decoding offsets from assigned partition, dropping offset map (will replay previously completed messages - partition: {}, data: {})", new Object[]{tp, offsetAndMeta, offsetDecodingError});
                }
            }
        });
        assignment.stream().filter(x -> !states.containsKey(x)).forEach(x -> states.put((TopicPartition)x, new PartitionState((TopicPartition)x, HighestOffsetAndIncompletes.of())));
        return states;
    }

    static HighestOffsetAndIncompletes deserialiseIncompleteOffsetMapFromBase64(long committedOffsetForPartition, String base64EncodedOffsetPayload) throws OffsetDecodingError {
        byte[] decodedBytes;
        try {
            decodedBytes = OffsetSimpleSerialisation.decodeBase64(base64EncodedOffsetPayload);
        }
        catch (IllegalArgumentException a) {
            throw new OffsetDecodingError(StringUtils.msg("Error decoding offset metadata, input was: {}", base64EncodedOffsetPayload), a);
        }
        return OffsetMapCodecManager.decodeCompressedOffsets(committedOffsetForPartition, decodedBytes);
    }

    PartitionState<K, V> decodeIncompletes(long nextExpectedOffset, TopicPartition tp, String offsetMetadataPayload) throws OffsetDecodingError {
        HighestOffsetAndIncompletes incompletes = OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(nextExpectedOffset, offsetMetadataPayload);
        log.debug("Loaded incomplete offsets from offset payload {}", (Object)incompletes);
        return new PartitionState(tp, incompletes);
    }

    public String makeOffsetMetadataPayload(long finalOffsetForPartition, PartitionState<K, V> state) throws EncodingNotSupportedException {
        String offsetMap = this.serialiseIncompleteOffsetMapToBase64(finalOffsetForPartition, state);
        return offsetMap;
    }

    String serialiseIncompleteOffsetMapToBase64(long finalOffsetForPartition, PartitionState<K, V> state) throws EncodingNotSupportedException {
        byte[] compressedEncoding = this.encodeOffsetsCompressed(finalOffsetForPartition, state);
        String b64 = OffsetSimpleSerialisation.base64(compressedEncoding);
        return b64;
    }

    byte[] encodeOffsetsCompressed(long finalOffsetForPartition, PartitionState<K, V> partition) throws EncodingNotSupportedException {
        TopicPartition tp = partition.getTp();
        Set<Long> incompleteOffsets = partition.getIncompleteOffsets();
        log.debug("Encoding partition {} incomplete offsets {}", (Object)tp, incompleteOffsets);
        long offsetHighestSucceeded = partition.getOffsetHighestSucceeded();
        OffsetSimultaneousEncoder simultaneousEncoder = new OffsetSimultaneousEncoder(finalOffsetForPartition, offsetHighestSucceeded, incompleteOffsets).invoke();
        if (forcedCodec.isPresent()) {
            OffsetEncoding forcedOffsetEncoding = forcedCodec.get();
            log.debug("Forcing use of {}, for testing", (Object)forcedOffsetEncoding);
            Map<OffsetEncoding, byte[]> encodingMap = simultaneousEncoder.getEncodingMap();
            byte[] bytes = encodingMap.get((Object)forcedOffsetEncoding);
            if (bytes == null) {
                throw new EncodingNotSupportedException(StringUtils.msg("Can't force an encoding that hasn't been run: {}", new Object[]{forcedOffsetEncoding}));
            }
            return simultaneousEncoder.packEncoding(new EncodedOffsetPair(forcedOffsetEncoding, ByteBuffer.wrap(bytes)));
        }
        return simultaneousEncoder.packSmallest();
    }

    static HighestOffsetAndIncompletes decodeCompressedOffsets(long nextExpectedOffset, byte[] decodedBytes) {
        if (decodedBytes.length == 0) {
            long highestSeenOffsetIsThen = nextExpectedOffset - 1L;
            return HighestOffsetAndIncompletes.of(highestSeenOffsetIsThen);
        }
        EncodedOffsetPair result = EncodedOffsetPair.unwrap(decodedBytes);
        HighestOffsetAndIncompletes incompletesTuple = result.getDecodedIncompletes(nextExpectedOffset);
        Set<Long> incompletes = incompletesTuple.getIncompleteOffsets();
        long highWater = incompletesTuple.getHighestSeenOffset();
        return HighestOffsetAndIncompletes.of(highWater, incompletes);
    }

    String incompletesToBitmapString(long finalOffsetForPartition, PartitionState<K, V> state) {
        StringBuilder runLengthString = new StringBuilder();
        Long lowWaterMark = finalOffsetForPartition;
        Long highWaterMark = state.getOffsetHighestSeen();
        long end = highWaterMark - lowWaterMark;
        for (Integer relativeOffset : Range.range(end)) {
            long offset = lowWaterMark + (long)relativeOffset.intValue();
            if (state.getIncompleteOffsets().contains(offset)) {
                runLengthString.append("o");
                continue;
            }
            runLengthString.append("x");
        }
        return runLengthString.toString();
    }

    static Set<Long> bitmapStringToIncomplete(long baseOffset, String inputBitmapString) {
        HashSet<Long> incompleteOffsets = new HashSet<Long>();
        long longLength = inputBitmapString.length();
        Range.range(longLength).forEach(i -> {
            char bit = inputBitmapString.charAt((int)i);
            if (bit == 'o') {
                incompleteOffsets.add(baseOffset + (long)i.intValue());
            } else if (bit == 'x') {
                log.trace("Dropping completed offset");
            } else {
                throw new IllegalArgumentException("Invalid encoding - unexpected char: " + bit);
            }
        });
        return incompleteOffsets;
    }

    public static final class HighestOffsetAndIncompletes {
        private final Long highestSeenOffset;
        private final Set<Long> incompleteOffsets;

        public static HighestOffsetAndIncompletes of(Long highestSeenOffset) {
            return new HighestOffsetAndIncompletes(highestSeenOffset, new HashSet<Long>());
        }

        public static HighestOffsetAndIncompletes of(long highestSeenOffset, Set<Long> incompleteOffsets) {
            return new HighestOffsetAndIncompletes(highestSeenOffset, incompleteOffsets);
        }

        public static HighestOffsetAndIncompletes of() {
            return HighestOffsetAndIncompletes.of(null);
        }

        public HighestOffsetAndIncompletes(Long highestSeenOffset, Set<Long> incompleteOffsets) {
            this.highestSeenOffset = highestSeenOffset;
            this.incompleteOffsets = incompleteOffsets;
        }

        public Long getHighestSeenOffset() {
            return this.highestSeenOffset;
        }

        public Set<Long> getIncompleteOffsets() {
            return this.incompleteOffsets;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof HighestOffsetAndIncompletes)) {
                return false;
            }
            HighestOffsetAndIncompletes other = (HighestOffsetAndIncompletes)o;
            Long this$highestSeenOffset = this.getHighestSeenOffset();
            Long other$highestSeenOffset = other.getHighestSeenOffset();
            if (this$highestSeenOffset == null ? other$highestSeenOffset != null : !((Object)this$highestSeenOffset).equals(other$highestSeenOffset)) {
                return false;
            }
            Set<Long> this$incompleteOffsets = this.getIncompleteOffsets();
            Set<Long> other$incompleteOffsets = other.getIncompleteOffsets();
            return !(this$incompleteOffsets == null ? other$incompleteOffsets != null : !((Object)this$incompleteOffsets).equals(other$incompleteOffsets));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Long $highestSeenOffset = this.getHighestSeenOffset();
            result = result * 59 + ($highestSeenOffset == null ? 43 : ((Object)$highestSeenOffset).hashCode());
            Set<Long> $incompleteOffsets = this.getIncompleteOffsets();
            result = result * 59 + ($incompleteOffsets == null ? 43 : ((Object)$incompleteOffsets).hashCode());
            return result;
        }

        public String toString() {
            return "OffsetMapCodecManager.HighestOffsetAndIncompletes(highestSeenOffset=" + this.getHighestSeenOffset() + ", incompleteOffsets=" + this.getIncompleteOffsets() + ")";
        }
    }
}

