/*
 * Decompiled with CFR 0.152.
 */
package com.alicp.jetcache.redis.springdata;

import com.alicp.jetcache.CacheConfigException;
import com.alicp.jetcache.CacheManager;
import com.alicp.jetcache.CacheResult;
import com.alicp.jetcache.redis.springdata.RedisSpringDataCacheConfig;
import com.alicp.jetcache.support.BroadcastManager;
import com.alicp.jetcache.support.CacheMessage;
import com.alicp.jetcache.support.SquashedLogger;
import java.nio.charset.StandardCharsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;

public class SpringDataBroadcastManager
extends BroadcastManager {
    private static final Logger logger = LoggerFactory.getLogger(SpringDataBroadcastManager.class);
    private final RedisSpringDataCacheConfig config;
    private final MessageListener listener = this::onMessage;
    private final byte[] channel;
    private volatile RedisMessageListenerContainer listenerContainer;

    public SpringDataBroadcastManager(CacheManager cacheManager, RedisSpringDataCacheConfig config) {
        super(cacheManager);
        this.config = config;
        if (config.getConnectionFactory() == null) {
            throw new CacheConfigException("connectionFactory is required");
        }
        if (config.getBroadcastChannel() == null) {
            throw new CacheConfigException("broadcastChannel is required");
        }
        this.channel = config.getBroadcastChannel().getBytes(StandardCharsets.UTF_8);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public CacheResult publish(CacheMessage cacheMessage) {
        try (RedisConnection con = this.config.getConnectionFactory().getConnection();){
            byte[] body = (byte[])this.config.getValueEncoder().apply(cacheMessage);
            con.publish(this.channel, body);
            CacheResult cacheResult = CacheResult.SUCCESS_WITHOUT_MSG;
            return cacheResult;
        }
        catch (Exception ex) {
            SquashedLogger.getLogger((Logger)logger).error((CharSequence)"jetcache publish error", (Throwable)ex);
            return new CacheResult((Throwable)ex);
        }
    }

    public synchronized void startSubscribe() {
        if (this.listenerContainer != null) {
            throw new IllegalStateException("subscribe thread is started");
        }
        ChannelTopic topic = new ChannelTopic(this.config.getBroadcastChannel());
        if (this.config.getListenerContainer() == null) {
            RedisMessageListenerContainer c = new RedisMessageListenerContainer();
            c.setConnectionFactory(this.config.getConnectionFactory());
            c.afterPropertiesSet();
            c.start();
            this.listenerContainer = c;
            logger.info("create RedisMessageListenerContainer instance");
        } else {
            this.listenerContainer = this.config.getListenerContainer();
        }
        this.listenerContainer.addMessageListener(this.listener, (Topic)topic);
        logger.info("subscribe jetcache invalidate notification. channel={}", (Object)this.config.getBroadcastChannel());
    }

    private void onMessage(Message message, byte[] pattern) {
        this.processNotification(message.getBody(), this.config.getValueDecoder());
    }

    public synchronized void close() throws Exception {
        if (this.listenerContainer != null) {
            this.listenerContainer.removeMessageListener(this.listener);
            if (this.config.getListenerContainer() == null) {
                this.listenerContainer.destroy();
            }
        }
        this.listenerContainer = null;
    }
}

