/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.inbound;

import java.lang.reflect.Type;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.Lifecycle;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallbackFactory;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class KafkaMessageSource<K, V>
extends AbstractMessageSource<Object>
implements DisposableBean,
Lifecycle {
    private static final long DEFAULT_POLL_TIMEOUT = 50L;
    private static final long MIN_ASSIGN_TIMEOUT = 2000L;
    private final Supplier<Duration> minTimeoutProvider = () -> Duration.ofMillis(Math.max(this.pollTimeout.toMillis() * 20L, 2000L));
    private final Log logger = LogFactory.getLog(((Object)((Object)this)).getClass());
    private final ConsumerFactory<K, V> consumerFactory;
    private final KafkaAckCallbackFactory<K, V> ackCallbackFactory;
    private final String[] topics;
    private final Object consumerMonitor = new Object();
    private final Map<TopicPartition, Set<KafkaAckInfo<K, V>>> inflightRecords = new HashMap<TopicPartition, Set<KafkaAckInfo<K, V>>>();
    private String groupId;
    private String clientId = "message.source";
    private Duration pollTimeout = Duration.ofMillis(50L);
    private RecordMessageConverter messageConverter = new MessagingMessageConverter();
    private Type payloadType;
    private ConsumerRebalanceListener rebalanceListener;
    private boolean rawMessageHeader;
    private volatile Consumer<K, V> consumer;
    private boolean running;
    private boolean assigned;
    private Duration assignTimeout = this.minTimeoutProvider.get();

    public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, String ... topics) {
        this(consumerFactory, new KafkaAckCallbackFactory(), topics);
    }

    public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, KafkaAckCallbackFactory<K, V> ackCallbackFactory, String ... topics) {
        Assert.notNull(consumerFactory, (String)"'consumerFactory' must not be null");
        Assert.notNull(ackCallbackFactory, (String)"'ackCallbackFactory' must not be null");
        this.consumerFactory = this.fixOrRejectConsumerFactory(consumerFactory);
        this.ackCallbackFactory = ackCallbackFactory;
        this.topics = topics;
    }

    protected String getGroupId() {
        return this.groupId;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    protected String getClientId() {
        return this.clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    protected long getPollTimeout() {
        return this.pollTimeout.toMillis();
    }

    public void setPollTimeout(long pollTimeout) {
        this.pollTimeout = Duration.ofMillis(pollTimeout);
        this.assignTimeout = this.minTimeoutProvider.get();
    }

    protected RecordMessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(RecordMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    protected Type getPayloadType() {
        return this.payloadType;
    }

    public void setPayloadType(Type payloadType) {
        this.payloadType = payloadType;
    }

    protected ConsumerRebalanceListener getRebalanceListener() {
        return this.rebalanceListener;
    }

    public void setRebalanceListener(ConsumerRebalanceListener rebalanceListener) {
        this.rebalanceListener = rebalanceListener;
    }

    public String getComponentType() {
        return "kafka:message-source";
    }

    protected boolean isRawMessageHeader() {
        return this.rawMessageHeader;
    }

    public void setRawMessageHeader(boolean rawMessageHeader) {
        this.rawMessageHeader = rawMessageHeader;
    }

    private ConsumerFactory<K, V> fixOrRejectConsumerFactory(ConsumerFactory<K, V> suppliedConsumerFactory) {
        Object maxPoll = suppliedConsumerFactory.getConfigurationProperties().get("max.poll.records");
        if (maxPoll == null || maxPoll instanceof Number && ((Number)maxPoll).intValue() != 1 || maxPoll instanceof String && Integer.parseInt((String)maxPoll) != 1) {
            if (!suppliedConsumerFactory.getClass().getName().equals(DefaultKafkaConsumerFactory.class.getName())) {
                throw new IllegalArgumentException("Custom consumer factory is not configured with 'max.poll.records = 1'");
            }
            if (this.logger.isWarnEnabled()) {
                this.logger.warn((Object)("'max.poll.records' has been forced from " + (maxPoll == null ? "unspecified" : maxPoll) + " to 1, to avoid having to seek after each record"));
            }
            HashMap<String, Integer> configs = new HashMap<String, Integer>(suppliedConsumerFactory.getConfigurationProperties());
            configs.put("max.poll.records", 1);
            DefaultKafkaConsumerFactory fixedConsumerFactory = new DefaultKafkaConsumerFactory(configs);
            if (suppliedConsumerFactory.getKeyDeserializer() != null) {
                fixedConsumerFactory.setKeyDeserializer(suppliedConsumerFactory.getKeyDeserializer());
            }
            if (suppliedConsumerFactory.getValueDeserializer() != null) {
                fixedConsumerFactory.setValueDeserializer(suppliedConsumerFactory.getValueDeserializer());
            }
            return fixedConsumerFactory;
        }
        return suppliedConsumerFactory;
    }

    public synchronized boolean isRunning() {
        return this.running;
    }

    public synchronized void start() {
        this.running = true;
    }

    public synchronized void stop() {
        this.stopConsumer();
        this.running = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected synchronized Object doReceive() {
        TopicPartition topicPartition;
        ConsumerRecord record;
        if (this.consumer == null) {
            this.createConsumer();
            this.running = true;
        }
        Object object = this.consumerMonitor;
        synchronized (object) {
            ConsumerRecords records = this.consumer.poll(this.assigned ? this.pollTimeout : this.assignTimeout);
            if (records == null || records.count() == 0) {
                return null;
            }
            record = (ConsumerRecord)records.iterator().next();
            topicPartition = new TopicPartition(record.topic(), record.partition());
        }
        KafkaAckInfoImpl ackInfo = new KafkaAckInfoImpl(record, topicPartition);
        AcknowledgmentCallback ackCallback = this.ackCallbackFactory.createCallback(ackInfo);
        this.inflightRecords.computeIfAbsent(topicPartition, tp -> new TreeSet()).add(ackInfo);
        Message message = this.messageConverter.toMessage(record, ackCallback instanceof Acknowledgment ? (Acknowledgment)ackCallback : null, this.consumer, this.payloadType);
        if (message.getHeaders() instanceof KafkaMessageHeaders) {
            Map rawHeaders = ((KafkaMessageHeaders)message.getHeaders()).getRawHeaders();
            rawHeaders.put("acknowledgmentCallback", ackCallback);
            if (this.rawMessageHeader) {
                rawHeaders.put("kafka_data", record);
            }
            return message;
        }
        AbstractIntegrationMessageBuilder builder = this.getMessageBuilderFactory().fromMessage(message).setHeader("acknowledgmentCallback", (Object)ackCallback);
        if (this.rawMessageHeader) {
            builder.setHeader("kafka_data", (Object)record);
        }
        return builder;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void createConsumer() {
        Object object = this.consumerMonitor;
        synchronized (object) {
            this.consumer = this.consumerFactory.createConsumer(this.groupId, this.clientId, null);
            this.consumer.subscribe(Arrays.asList(this.topics), new ConsumerRebalanceListener(){

                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    if (KafkaMessageSource.this.logger.isInfoEnabled()) {
                        KafkaMessageSource.this.logger.info((Object)("Partitions revoked: " + partitions));
                    }
                    if (KafkaMessageSource.this.rebalanceListener != null) {
                        KafkaMessageSource.this.rebalanceListener.onPartitionsRevoked(partitions);
                    }
                }

                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    KafkaMessageSource.this.assigned = true;
                    if (KafkaMessageSource.this.logger.isInfoEnabled()) {
                        KafkaMessageSource.this.logger.info((Object)("Partitions assigned: " + partitions));
                    }
                    if (KafkaMessageSource.this.rebalanceListener != null) {
                        KafkaMessageSource.this.rebalanceListener.onPartitionsAssigned(partitions);
                    }
                }
            });
        }
    }

    public synchronized void destroy() {
        this.stopConsumer();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopConsumer() {
        Object object = this.consumerMonitor;
        synchronized (object) {
            if (this.consumer != null) {
                this.consumer.close();
                this.consumer = null;
                this.assigned = false;
            }
        }
    }

    public static interface KafkaAckInfo<K, V>
    extends Comparable<KafkaAckInfo<K, V>> {
        public Object getConsumerMonitor();

        public String getGroupId();

        public Consumer<K, V> getConsumer();

        public ConsumerRecord<K, V> getRecord();

        public TopicPartition getTopicPartition();

        public Map<TopicPartition, Set<KafkaAckInfo<K, V>>> getOffsets();

        public boolean isRolledBack();

        public void setRolledBack(boolean var1);

        public boolean isAckDeferred();

        public void setAckDeferred(boolean var1);
    }

    public class KafkaAckInfoImpl
    implements KafkaAckInfo<K, V> {
        private final ConsumerRecord<K, V> record;
        private final TopicPartition topicPartition;
        private volatile boolean rolledBack;
        private volatile boolean ackDeferred;

        KafkaAckInfoImpl(ConsumerRecord<K, V> record, TopicPartition topicPartition) {
            this.record = record;
            this.topicPartition = topicPartition;
        }

        @Override
        public Object getConsumerMonitor() {
            return KafkaMessageSource.this.consumerMonitor;
        }

        @Override
        public String getGroupId() {
            return KafkaMessageSource.this.groupId;
        }

        @Override
        public Consumer<K, V> getConsumer() {
            return KafkaMessageSource.this.consumer;
        }

        @Override
        public ConsumerRecord<K, V> getRecord() {
            return this.record;
        }

        @Override
        public TopicPartition getTopicPartition() {
            return this.topicPartition;
        }

        @Override
        public Map<TopicPartition, Set<KafkaAckInfo<K, V>>> getOffsets() {
            return KafkaMessageSource.this.inflightRecords;
        }

        @Override
        public boolean isRolledBack() {
            return this.rolledBack;
        }

        @Override
        public void setRolledBack(boolean rolledBack) {
            this.rolledBack = rolledBack;
        }

        @Override
        public boolean isAckDeferred() {
            return this.ackDeferred;
        }

        @Override
        public void setAckDeferred(boolean ackDeferred) {
            this.ackDeferred = ackDeferred;
        }

        @Override
        public int compareTo(KafkaAckInfo<K, V> other) {
            return Long.compare(this.record.offset(), other.getRecord().offset());
        }

        public String toString() {
            return "KafkaAckInfo [record=" + this.record + ", rolledBack=" + this.rolledBack + ", ackDeferred=" + this.ackDeferred + "]";
        }
    }

    public static class KafkaAckCallback<K, V>
    implements AcknowledgmentCallback,
    Acknowledgment {
        private final Log logger = LogFactory.getLog(this.getClass());
        private final KafkaAckInfo<K, V> ackInfo;
        private volatile boolean acknowledged;
        private boolean autoAckEnabled = true;

        public KafkaAckCallback(KafkaAckInfo<K, V> ackInfo) {
            Assert.notNull(ackInfo, (String)"'ackInfo' cannot be null");
            this.ackInfo = ackInfo;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         * Converted monitor instructions to comments
         * Lifted jumps to return sites
         */
        public void acknowledge(AcknowledgmentCallback.Status status) {
            Assert.notNull((Object)status, (String)"'status' cannot be null");
            if (this.acknowledged) {
                throw new IllegalStateException("Already acknowledged");
            }
            Object object = this.ackInfo.getConsumerMonitor();
            // MONITORENTER : object
            try {
                ConsumerRecord<K, V> record = this.ackInfo.getRecord();
                switch (status) {
                    case ACCEPT: 
                    case REJECT: {
                        this.commitIfPossible(record);
                        return;
                    }
                    case REQUEUE: {
                        this.rollback(record);
                        return;
                    }
                }
                return;
            }
            catch (WakeupException e) {
                throw new IllegalStateException(e);
            }
            finally {
                this.acknowledged = true;
                if (!this.ackInfo.isAckDeferred()) {
                    this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition()).remove(this.ackInfo);
                }
            }
        }

        private void rollback(ConsumerRecord<K, V> record) {
            List rewound;
            this.ackInfo.getConsumer().seek(this.ackInfo.getTopicPartition(), record.offset());
            Set<KafkaAckInfo<K, V>> inflight = this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition());
            if (inflight.size() > 1 && (rewound = inflight.stream().filter(i -> i.getRecord().offset() > record.offset()).map(i -> {
                i.setRolledBack(true);
                return i.getRecord().offset();
            }).collect(Collectors.toList())).size() > 0 && this.logger.isWarnEnabled()) {
                this.logger.warn((Object)("Rolled back " + record + " later in-flight offsets " + rewound + " will also be re-fetched"));
            }
        }

        private void commitIfPossible(ConsumerRecord<K, V> record) {
            if (this.ackInfo.isRolledBack()) {
                if (this.logger.isWarnEnabled()) {
                    this.logger.warn((Object)("Cannot commit offset for " + record + "; an earlier offset was rolled back"));
                }
            } else {
                Set<KafkaAckInfo<K, V>> candidates = this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition());
                KafkaAckInfo ackInfo = null;
                if (candidates.iterator().next().equals(this.ackInfo)) {
                    ArrayList<KafkaAckInfo<K, V>> toCommit = new ArrayList<KafkaAckInfo<K, V>>();
                    for (KafkaAckInfo<K, V> info : candidates) {
                        if (info == this.ackInfo) continue;
                        if (!info.isAckDeferred()) break;
                        toCommit.add(info);
                    }
                    if (toCommit.size() > 0) {
                        ackInfo = (KafkaAckInfo)toCommit.get(toCommit.size() - 1);
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug((Object)("Committing pending offsets for " + record + " and all deferred to " + ackInfo.getRecord()));
                        }
                        candidates.removeAll(toCommit);
                    } else {
                        ackInfo = this.ackInfo;
                    }
                } else {
                    this.ackInfo.setAckDeferred(true);
                }
                if (ackInfo != null) {
                    ackInfo.getConsumer().commitSync(Collections.singletonMap(ackInfo.getTopicPartition(), new OffsetAndMetadata(ackInfo.getRecord().offset() + 1L)));
                } else if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)"Deferring commit offset; earlier messages are in flight.");
                }
            }
        }

        public boolean isAcknowledged() {
            return this.acknowledged;
        }

        public void acknowledge() {
            this.acknowledge(AcknowledgmentCallback.Status.ACCEPT);
        }

        public void noAutoAck() {
            this.autoAckEnabled = false;
        }

        public boolean isAutoAck() {
            return this.autoAckEnabled;
        }
    }

    public static class KafkaAckCallbackFactory<K, V>
    implements AcknowledgmentCallbackFactory<KafkaAckInfo<K, V>> {
        public AcknowledgmentCallback createCallback(KafkaAckInfo<K, V> info) {
            return new KafkaAckCallback<K, V>(info);
        }
    }
}

