/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.redis.inbound;

import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.integration.channel.MessagePublishingErrorHandler;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.redis.event.RedisExceptionEvent;
import org.springframework.integration.support.channel.ChannelResolverUtils;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.jmx.export.annotation.ManagedMetric;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;

@ManagedResource
@IntegrationManagedResource
public class RedisQueueMessageDrivenEndpoint
extends MessageProducerSupport
implements ApplicationEventPublisherAware,
BeanClassLoaderAware {
    public static final long DEFAULT_RECEIVE_TIMEOUT = 1000L;
    public static final long DEFAULT_RECOVERY_INTERVAL = 5000L;
    private final BoundListOperations<String, byte[]> boundListOperations;
    private ApplicationEventPublisher applicationEventPublisher;
    private Executor taskExecutor;
    private RedisSerializer<?> serializer;
    private boolean serializerExplicitlySet;
    private boolean expectMessage = false;
    private long receiveTimeout = 1000L;
    private long recoveryInterval = 5000L;
    private boolean rightPop = true;
    private volatile boolean active;
    private volatile boolean listening;
    private volatile Runnable stopCallback;

    public RedisQueueMessageDrivenEndpoint(String queueName, RedisConnectionFactory connectionFactory) {
        Assert.hasText((String)queueName, (String)"'queueName' is required");
        Assert.notNull((Object)connectionFactory, (String)"'connectionFactory' must not be null");
        RedisTemplate template = new RedisTemplate();
        template.setConnectionFactory(connectionFactory);
        template.setEnableDefaultSerializer(false);
        template.setKeySerializer((RedisSerializer)new StringRedisSerializer());
        template.afterPropertiesSet();
        this.boundListOperations = template.boundListOps((Object)queueName);
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    public void setBeanClassLoader(ClassLoader beanClassLoader) {
        if (!this.serializerExplicitlySet) {
            this.serializer = new JdkSerializationRedisSerializer(beanClassLoader);
        }
    }

    public void setSerializer(RedisSerializer<?> serializer) {
        this.serializer = serializer;
        this.serializerExplicitlySet = true;
    }

    public void setExpectMessage(boolean expectMessage) {
        this.expectMessage = expectMessage;
    }

    public void setReceiveTimeout(long receiveTimeout) {
        Assert.isTrue((receiveTimeout >= 0L ? 1 : 0) != 0, (String)"'receiveTimeout' must be >= 0.");
        this.receiveTimeout = receiveTimeout;
    }

    public void setTaskExecutor(Executor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public void setRecoveryInterval(long recoveryInterval) {
        this.recoveryInterval = recoveryInterval;
    }

    public void setRightPop(boolean rightPop) {
        this.rightPop = rightPop;
    }

    protected void onInit() {
        super.onInit();
        if (this.expectMessage) {
            Assert.notNull(this.serializer, (String)"'serializer' has to be provided where 'expectMessage == true'.");
        }
        if (this.taskExecutor == null) {
            String beanName = this.getComponentName();
            this.taskExecutor = new SimpleAsyncTaskExecutor((beanName == null ? "" : beanName + "-") + this.getComponentType());
        }
        BeanFactory beanFactory = this.getBeanFactory();
        if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && beanFactory != null) {
            MessagePublishingErrorHandler errorHandler = new MessagePublishingErrorHandler(ChannelResolverUtils.getChannelResolver((BeanFactory)beanFactory));
            errorHandler.setDefaultErrorChannel(this.getErrorChannel());
            this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, (ErrorHandler)errorHandler);
        }
    }

    public String getComponentType() {
        return "redis:queue-inbound-channel-adapter";
    }

    private void popMessageAndSend() {
        byte[] value = this.popForValue();
        Message message = null;
        if (value != null) {
            if (this.expectMessage) {
                try {
                    message = (Message)this.serializer.deserialize(value);
                }
                catch (Exception e) {
                    throw new MessagingException("Deserialization of Message failed.", (Throwable)e);
                }
            } else {
                Object payload = value;
                if (this.serializer != null) {
                    payload = this.serializer.deserialize(value);
                }
                if (payload != null) {
                    message = this.getMessageBuilderFactory().withPayload(payload).build();
                }
            }
        }
        if (message != null) {
            if (this.listening) {
                this.sendMessage(message);
            } else if (this.rightPop) {
                this.boundListOperations.rightPush((Object)value);
            } else {
                this.boundListOperations.leftPush((Object)value);
            }
        }
    }

    private byte[] popForValue() {
        byte[] value = null;
        try {
            value = this.rightPop ? (byte[])this.boundListOperations.rightPop(this.receiveTimeout, TimeUnit.MILLISECONDS) : (byte[])this.boundListOperations.leftPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            this.listening = false;
            if (this.active) {
                this.logger.error((Object)("Failed to execute listening task. Will attempt to resubmit in " + this.recoveryInterval + " milliseconds."), (Throwable)e);
                this.publishException(e);
                this.sleepBeforeRecoveryAttempt();
            }
            this.logger.debug((Object)("Failed to execute listening task. " + e.getClass() + ": " + e.getMessage()));
        }
        return value;
    }

    protected void doStart() {
        if (!this.active) {
            this.active = true;
            this.restart();
        }
    }

    private void sleepBeforeRecoveryAttempt() {
        if (this.recoveryInterval > 0L) {
            try {
                Thread.sleep(this.recoveryInterval);
            }
            catch (InterruptedException e) {
                this.logger.debug((Object)"Thread interrupted while sleeping the recovery interval");
                Thread.currentThread().interrupt();
            }
        }
    }

    private void publishException(Exception e) {
        if (this.applicationEventPublisher != null) {
            this.applicationEventPublisher.publishEvent((ApplicationEvent)new RedisExceptionEvent((Object)this, e));
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("No application event publisher for exception: " + e.getMessage()));
        }
    }

    private void restart() {
        this.taskExecutor.execute((Runnable)((Object)new ListenerTask()));
    }

    protected void doStop(Runnable callback) {
        this.stopCallback = callback;
        this.doStop();
    }

    protected void doStop() {
        super.doStop();
        this.active = false;
        this.listening = false;
    }

    public boolean isListening() {
        return this.listening;
    }

    @ManagedMetric
    public long getQueueSize() {
        return Optional.ofNullable(this.boundListOperations.size()).orElse(0L);
    }

    @ManagedOperation
    public void clearQueue() {
        this.boundListOperations.getOperations().delete(this.boundListOperations.getKey());
    }

    private class ListenerTask
    implements SchedulingAwareRunnable {
        ListenerTask() {
        }

        public boolean isLongLived() {
            return true;
        }

        public void run() {
            try {
                while (RedisQueueMessageDrivenEndpoint.this.active) {
                    RedisQueueMessageDrivenEndpoint.this.listening = true;
                    RedisQueueMessageDrivenEndpoint.this.popMessageAndSend();
                }
            }
            finally {
                if (RedisQueueMessageDrivenEndpoint.this.active) {
                    RedisQueueMessageDrivenEndpoint.this.restart();
                } else if (RedisQueueMessageDrivenEndpoint.this.stopCallback != null) {
                    RedisQueueMessageDrivenEndpoint.this.stopCallback.run();
                    RedisQueueMessageDrivenEndpoint.this.stopCallback = null;
                }
            }
        }
    }
}

