package org.mule.extension.redis.internal.service;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.mule.extension.redis.internal.error.exceptions.RedisConnectionException;
import org.mule.extension.redis.internal.error.exceptions.UnableToUnsubscribeException;
import org.mule.extension.redis.internal.service.dto.MessageDTO;
import org.mule.extension.redis.internal.utils.RedisUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.BinaryJedisPubSub;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.util.SafeEncoder;

/* loaded from: input_file:org/mule/extension/redis/internal/service/RedisMessagingAPIService.class */
public class RedisMessagingAPIService implements MessagingAPIService<MessageDTO> {
    private static final Logger logger = LoggerFactory.getLogger(RedisMessagingAPIService.class);
    private RedisClientAdapter client;

    /* loaded from: input_file:org/mule/extension/redis/internal/service/RedisMessagingAPIService$ChannelMessageListener.class */
    public static class ChannelMessageListener extends BinaryJedisPubSub {
        private Consumer<MessageDTO> consumer;

        public ChannelMessageListener(Consumer<MessageDTO> consumer) {
            this.consumer = consumer;
        }

        public void onPMessage(byte[] bArr, byte[] bArr2, byte[] bArr3) {
            this.consumer.accept(new MessageDTO(SafeEncoder.encode(bArr2), SafeEncoder.encode(bArr3)));
        }
    }

    public RedisMessagingAPIService(RedisClientAdapter redisClientAdapter) {
        Objects.requireNonNull(redisClientAdapter, "Invalid client. Client cannot be null.");
        this.client = redisClientAdapter;
    }

    @Override // org.mule.extension.redis.internal.service.MessagingAPIService
    public ChannelSubscription subscribeToChannel(Consumer<MessageDTO> consumer, Consumer<RuntimeException> consumer2, List<String> list) {
        checkPreconditionsForChannelList(list);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        ChannelMessageListener channelMessageListener = new ChannelMessageListener(consumer);
        newSingleThreadExecutor.submit(() -> {
            executeWithExceptionHandling(this.client, redisClientAdapter -> {
                redisClientAdapter.psubscribe(channelMessageListener, RedisUtils.getPatternsFromChannels(list));
            }, consumer2);
        });
        return () -> {
            try {
                if (channelMessageListener.isSubscribed()) {
                    logger.debug("Unsubscribing channel listener.");
                    channelMessageListener.punsubscribe();
                    logger.debug("Successfully unsubscribed from channel.");
                }
            } catch (JedisConnectionException e) {
                consumer2.accept(new UnableToUnsubscribeException("Unknown error while trying to unsubscribe.", e));
            }
            try {
                newSingleThreadExecutor.shutdown();
                newSingleThreadExecutor.awaitTermination(3L, TimeUnit.SECONDS);
            } catch (InterruptedException e2) {
                logger.error("Unable to gracefully shutdown executor service.", e2);
            }
        };
    }

    private void checkPreconditionsForChannelList(List<String> list) {
        Objects.requireNonNull(list, "The list of channels cannot be null.");
        if (list.isEmpty()) {
            throw new IllegalArgumentException("The list of channels cannot be empty.");
        }
    }

    private void executeWithExceptionHandling(RedisClientAdapter redisClientAdapter, Consumer<RedisClientAdapter> consumer, Consumer<RuntimeException> consumer2) {
        try {
            consumer.accept(redisClientAdapter);
        } catch (JedisConnectionException e) {
            consumer2.accept(new RedisConnectionException("Unable to establish connection with server.", e));
        }
    }
}
