package com.alibaba.mqtt.server;

import com.alibaba.mqtt.server.callback.MessageListener;
import com.alibaba.mqtt.server.callback.StatusListener;
import com.alibaba.mqtt.server.config.ChannelConfig;
import com.alibaba.mqtt.server.config.ConsumerConfig;
import com.alibaba.mqtt.server.model.MessageProperties;
import com.alibaba.mqtt.server.model.StatusNotice;
import com.alibaba.mqtt.server.network.AbstractChannel;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/* loaded from: input_file:com/alibaba/mqtt/server/ServerConsumer.class */
public class ServerConsumer extends AbstractChannel {
    private ConsumerConfig consumerConfig;
    private ExecutorService msgExecutor;
    private ExecutorService statusExecutor;

    public ServerConsumer(ChannelConfig channelConfig, ConsumerConfig consumerConfig) {
        super(channelConfig);
        this.consumerConfig = consumerConfig;
    }

    @Override // com.alibaba.mqtt.server.network.AbstractChannel
    public void start() throws IOException, TimeoutException {
        super.start();
        this.msgExecutor = new ThreadPoolExecutor(this.consumerConfig.getMinConsumeThreadNum(), this.consumerConfig.getMaxConsumeThreadNum(), 1L, TimeUnit.MINUTES, new LinkedBlockingQueue());
        this.statusExecutor = new ThreadPoolExecutor(this.consumerConfig.getMinConsumeThreadNum(), this.consumerConfig.getMaxConsumeThreadNum(), 1L, TimeUnit.MINUTES, new LinkedBlockingQueue());
    }

    @Override // com.alibaba.mqtt.server.network.AbstractChannel
    public void stop() throws IOException {
        super.stop();
    }

    public void subscribeTopic(String str, final MessageListener messageListener) throws IOException {
        this.channel.basicConsume(str, false, new DefaultConsumer(this.channel) { // from class: com.alibaba.mqtt.server.ServerConsumer.1
            public void handleDelivery(String str2, final Envelope envelope, final AMQP.BasicProperties basicProperties, final byte[] bArr) {
                ServerConsumer.this.msgExecutor.submit(new Runnable() { // from class: com.alibaba.mqtt.server.ServerConsumer.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            messageListener.process(basicProperties.getMessageId(), new MessageProperties(basicProperties), bArr);
                            ServerConsumer.this.channel.basicAck(envelope.getDeliveryTag(), false);
                        } catch (Throwable th) {
                            try {
                                ServerConsumer.this.channel.basicNack(envelope.getDeliveryTag(), false, false);
                            } catch (IOException e) {
                            }
                        }
                    }
                });
            }
        });
    }

    public void subscribeStatus(String str, final StatusListener statusListener) throws IOException {
        HashMap hashMap = new HashMap(4);
        hashMap.put("GROUP_ID", str);
        this.channel.basicConsume("STATUS", false, hashMap, new DefaultConsumer(this.channel) { // from class: com.alibaba.mqtt.server.ServerConsumer.2
            public void handleDelivery(String str2, final Envelope envelope, AMQP.BasicProperties basicProperties, final byte[] bArr) throws IOException {
                ServerConsumer.this.statusExecutor.submit(new Runnable() { // from class: com.alibaba.mqtt.server.ServerConsumer.2.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            statusListener.process(new StatusNotice(bArr));
                            ServerConsumer.this.channel.basicAck(envelope.getDeliveryTag(), false);
                        } catch (Throwable th) {
                            try {
                                ServerConsumer.this.channel.basicNack(envelope.getDeliveryTag(), false, false);
                            } catch (IOException e) {
                            }
                        }
                    }
                });
            }
        });
    }
}
