/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.kafka.service.consumer;

import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.kafka.service.api.common.OffsetSummary;
import org.apache.nifi.kafka.service.api.common.PartitionState;
import org.apache.nifi.kafka.service.api.common.TopicPartitionSummary;
import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
import org.apache.nifi.kafka.service.api.consumer.PollingSummary;
import org.apache.nifi.kafka.service.api.header.RecordHeader;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
import org.apache.nifi.kafka.service.consumer.Subscription;
import org.apache.nifi.logging.ComponentLog;

public class Kafka3ConsumerService
implements KafkaConsumerService,
Closeable,
ConsumerRebalanceListener {
    private final ComponentLog componentLog;
    private final Consumer<byte[], byte[]> consumer;
    private final Subscription subscription;
    private volatile boolean closed = false;

    public Kafka3ConsumerService(ComponentLog componentLog, Consumer<byte[], byte[]> consumer, Subscription subscription) {
        this.componentLog = Objects.requireNonNull(componentLog, "Component Log required");
        this.consumer = consumer;
        this.subscription = subscription;
        Optional<Pattern> topicPatternFound = subscription.getTopicPattern();
        if (topicPatternFound.isPresent()) {
            Pattern topicPattern = topicPatternFound.get();
            consumer.subscribe(topicPattern, (ConsumerRebalanceListener)this);
        } else {
            Collection<String> topics = subscription.getTopics();
            consumer.subscribe(topics, (ConsumerRebalanceListener)this);
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        this.componentLog.info("Kafka assigned the following Partitions to this consumer: {}", new Object[]{partitions});
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        this.componentLog.info("Kafka revoked the following Partitions from this consumer: {}", new Object[]{partitions});
        this.rollback(new HashSet<TopicPartition>(partitions));
    }

    public void commit(PollingSummary pollingSummary) {
        Map<TopicPartition, OffsetAndMetadata> offsets = this.getOffsets(pollingSummary);
        long started = System.currentTimeMillis();
        this.consumer.commitSync(offsets);
        long elapsed = started - System.currentTimeMillis();
        this.componentLog.debug("Committed Records in [{} ms] for {}", new Object[]{elapsed, pollingSummary});
    }

    public void rollback() {
        this.rollback(this.consumer.assignment());
    }

    private void rollback(Set<TopicPartition> partitions) {
        if (partitions.isEmpty()) {
            return;
        }
        try {
            Map metadataMap = this.consumer.committed(partitions);
            for (Map.Entry entry : metadataMap.entrySet()) {
                TopicPartition topicPartition = (TopicPartition)entry.getKey();
                OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata)entry.getValue();
                if (offsetAndMetadata == null) {
                    this.consumer.seekToBeginning(Collections.singleton(topicPartition));
                    this.componentLog.debug("Rolling back offsets so that {}-{} it is at the beginning", new Object[]{topicPartition.topic(), topicPartition.partition()});
                    continue;
                }
                this.consumer.seek(topicPartition, offsetAndMetadata.offset());
                this.componentLog.debug("Rolling back offsets so that {}-{} has offset of {}", new Object[]{topicPartition.topic(), topicPartition.partition(), offsetAndMetadata.offset()});
            }
        }
        catch (Exception rollbackException) {
            this.componentLog.warn("Attempted to rollback Kafka message offset but was unable to do so", (Throwable)rollbackException);
        }
    }

    public boolean isClosed() {
        return this.closed;
    }

    public Iterable<ByteRecord> poll(Duration maxWaitDuration) {
        ConsumerRecords consumerRecords = this.consumer.poll(maxWaitDuration);
        if (consumerRecords.isEmpty()) {
            return List.of();
        }
        return new RecordIterable((Iterable<ConsumerRecord<byte[], byte[]>>)consumerRecords);
    }

    public List<PartitionState> getPartitionStates() {
        List<Object> partitionStates;
        Iterator<String> topics = this.subscription.getTopics().iterator();
        if (topics.hasNext()) {
            String topic = topics.next();
            partitionStates = this.consumer.partitionsFor(topic).stream().map(partitionInfo -> new PartitionState(partitionInfo.topic(), partitionInfo.partition())).collect(Collectors.toList());
        } else {
            partitionStates = Collections.emptyList();
        }
        return partitionStates;
    }

    @Override
    public void close() {
        this.closed = true;
        this.consumer.close();
    }

    private Map<TopicPartition, OffsetAndMetadata> getOffsets(PollingSummary pollingSummary) {
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        Map summaryOffsets = pollingSummary.getOffsets();
        for (Map.Entry offsetEntry : summaryOffsets.entrySet()) {
            TopicPartitionSummary topicPartitionSummary = (TopicPartitionSummary)offsetEntry.getKey();
            TopicPartition topicPartition = new TopicPartition(topicPartitionSummary.getTopic(), topicPartitionSummary.getPartition());
            OffsetSummary offsetSummary = (OffsetSummary)offsetEntry.getValue();
            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offsetSummary.getOffset() + 1L);
            offsets.put(topicPartition, offsetAndMetadata);
        }
        return offsets;
    }

    private static class RecordIterable
    implements Iterable<ByteRecord> {
        private final Iterator<ByteRecord> records;

        private RecordIterable(Iterable<ConsumerRecord<byte[], byte[]>> consumerRecords) {
            this.records = new RecordIterator(consumerRecords);
        }

        @Override
        public Iterator<ByteRecord> iterator() {
            return this.records;
        }
    }

    private static class RecordIterator
    implements Iterator<ByteRecord> {
        private final Iterator<ConsumerRecord<byte[], byte[]>> consumerRecords;

        private RecordIterator(Iterable<ConsumerRecord<byte[], byte[]>> records) {
            this.consumerRecords = records.iterator();
        }

        @Override
        public boolean hasNext() {
            return this.consumerRecords.hasNext();
        }

        @Override
        public ByteRecord next() {
            ConsumerRecord<byte[], byte[]> consumerRecord = this.consumerRecords.next();
            ArrayList recordHeaders = new ArrayList();
            consumerRecord.headers().forEach(header -> {
                RecordHeader recordHeader = new RecordHeader(header.key(), header.value());
                recordHeaders.add(recordHeader);
            });
            byte[] value = (byte[])consumerRecord.value();
            if (value == null) {
                value = new byte[]{};
            }
            return new ByteRecord(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp(), recordHeaders, (byte[])consumerRecord.key(), value, 1L);
        }
    }
}

