/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.channel;

import java.util.Objects;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jspecify.annotations.Nullable;
import org.springframework.integration.dispatcher.LoadBalancingStrategy;
import org.springframework.integration.dispatcher.MessageDispatcher;
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
import org.springframework.integration.dispatcher.UnicastingDispatcher;
import org.springframework.integration.kafka.channel.AbstractKafkaChannel;
import org.springframework.integration.support.json.JacksonMessagingUtils;
import org.springframework.integration.support.management.ManageableSmartLifecycle;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;

public class SubscribableKafkaChannel
extends AbstractKafkaChannel
implements SubscribableChannel,
ManageableSmartLifecycle {
    private static final int DEFAULT_PHASE = 0x3FFFFFFF;
    private final KafkaListenerContainerFactory<?> factory;
    private final IntegrationRecordMessageListener recordListener = new IntegrationRecordMessageListener();
    private MessageDispatcher dispatcher;
    private MessageListenerContainer container;
    private boolean autoStartup = true;
    private int phase = 0x3FFFFFFF;
    private volatile boolean running;

    public SubscribableKafkaChannel(KafkaOperations<?, ?> template, KafkaListenerContainerFactory<?> factory, String channelTopic) {
        super(template, channelTopic);
        Assert.notNull(factory, (String)"'factory' cannot be null");
        this.factory = factory;
        if (JacksonPresent.isJackson2Present()) {
            MessagingMessageConverter messageConverter = new MessagingMessageConverter();
            DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
            headerMapper.addTrustedPackages(JacksonMessagingUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
            messageConverter.setHeaderMapper((KafkaHeaderMapper)headerMapper);
            this.recordListener.setMessageConverter((RecordMessageConverter)messageConverter);
        }
    }

    public void setMessageConverter(RecordMessageConverter messageConverter) {
        this.recordListener.setMessageConverter(messageConverter);
    }

    public int getPhase() {
        return this.phase;
    }

    public void setPhase(int phase) {
        this.phase = phase;
    }

    public boolean isRunning() {
        return this.running;
    }

    public void setAutoStartup(boolean autoStartup) {
        this.autoStartup = autoStartup;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    protected void onInit() {
        this.dispatcher = this.createDispatcher();
        this.container = this.factory.createContainer(new String[]{this.topic});
        String groupId = this.getGroupId();
        ContainerProperties containerProperties = this.container.getContainerProperties();
        containerProperties.setGroupId(groupId != null ? groupId : Objects.requireNonNull(this.getBeanName()));
        containerProperties.setMessageListener((Object)this.recordListener);
    }

    protected MessageDispatcher createDispatcher() {
        UnicastingDispatcher unicastingDispatcher = new UnicastingDispatcher();
        unicastingDispatcher.setLoadBalancingStrategy((LoadBalancingStrategy)new RoundRobinLoadBalancingStrategy());
        return unicastingDispatcher;
    }

    public void start() {
        this.container.start();
        this.running = true;
    }

    public void stop() {
        this.container.stop();
        this.running = false;
    }

    public void stop(Runnable callback) {
        this.container.stop(() -> {
            callback.run();
            this.running = false;
        });
    }

    public boolean subscribe(MessageHandler handler) {
        return this.dispatcher.addHandler(handler);
    }

    public boolean unsubscribe(MessageHandler handler) {
        return this.dispatcher.removeHandler(handler);
    }

    private class IntegrationRecordMessageListener
    extends RecordMessagingMessageListenerAdapter<Object, Object> {
        IntegrationRecordMessageListener() {
            super(null, null);
        }

        public void onMessage(ConsumerRecord<Object, Object> record, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer) {
            SubscribableKafkaChannel.this.dispatcher.dispatch(this.toMessagingMessage(record, acknowledgment, consumer));
        }
    }
}

