/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.command;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FutureListener;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.cache.ReferenceCacheMap;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisLoadingException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisResponseTimeoutException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.RedisTryAgainException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisExecutor<V, R> {
    static final Logger log = LoggerFactory.getLogger(RedisExecutor.class);
    final boolean readOnlyMode;
    final RedisCommand<V> command;
    final Object[] params;
    final RPromise<R> mainPromise;
    final boolean ignoreRedirect;
    final RedissonObjectBuilder objectBuilder;
    final ConnectionManager connectionManager;
    NodeSource source;
    Codec codec;
    volatile int attempt;
    volatile Timeout timeout;
    volatile BiConsumer<R, Throwable> mainPromiseListener;
    volatile ChannelFuture writeFuture;
    volatile RedisException exception;
    int attempts;
    long retryInterval;
    long responseTimeout;
    private static final Map<ClassLoader, Map<Codec, Codec>> CODECS = ReferenceCacheMap.soft(0L, 0L);

    public RedisExecutor(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect, ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) {
        this.readOnlyMode = readOnlyMode;
        this.source = source;
        this.codec = codec;
        this.command = command;
        this.params = params;
        this.mainPromise = mainPromise;
        this.ignoreRedirect = ignoreRedirect;
        this.connectionManager = connectionManager;
        this.objectBuilder = objectBuilder;
        this.attempts = connectionManager.getConfig().getRetryAttempts();
        this.retryInterval = connectionManager.getConfig().getRetryInterval();
        this.responseTimeout = connectionManager.getConfig().getTimeout();
    }

    public void execute() {
        if (this.mainPromise.isCancelled()) {
            this.free();
            return;
        }
        if (!this.connectionManager.getShutdownLatch().acquire()) {
            this.free();
            this.mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown"));
            return;
        }
        this.codec = this.getCodec(this.codec);
        RFuture<RedisConnection> connectionFuture = this.getConnection();
        final RedissonPromise attemptPromise = new RedissonPromise();
        this.mainPromiseListener = (r, e) -> {
            if (this.mainPromise.isCancelled() && connectionFuture.cancel(false)) {
                log.debug("Connection obtaining canceled for {}", (Object)this.command);
                this.timeout.cancel();
                if (attemptPromise.cancel(false)) {
                    this.free();
                }
            }
        };
        if (this.attempt == 0) {
            this.mainPromise.onComplete((r, e) -> {
                if (this.mainPromiseListener != null) {
                    this.mainPromiseListener.accept((R)r, (Throwable)e);
                }
            });
        }
        this.scheduleRetryTimeout(connectionFuture, attemptPromise);
        connectionFuture.onComplete((connection, e) -> {
            if (connectionFuture.isCancelled()) {
                this.connectionManager.getShutdownLatch().release();
                return;
            }
            if (!connectionFuture.isSuccess()) {
                this.connectionManager.getShutdownLatch().release();
                this.exception = this.convertException(connectionFuture);
                return;
            }
            if (attemptPromise.isDone() || this.mainPromise.isDone()) {
                this.releaseConnection(attemptPromise, connectionFuture);
                return;
            }
            this.sendCommand(attemptPromise, (RedisConnection)connection);
            this.writeFuture.addListener(new ChannelFutureListener((RedisConnection)connection){
                final /* synthetic */ RedisConnection val$connection;
                {
                    this.val$connection = redisConnection;
                }

                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    RedisExecutor.this.checkWriteFuture(RedisExecutor.this.writeFuture, attemptPromise, this.val$connection);
                }
            });
            this.releaseConnection(attemptPromise, connectionFuture);
        });
        attemptPromise.onComplete((r, e) -> this.checkAttemptPromise(attemptPromise, connectionFuture));
    }

    private void scheduleRetryTimeout(final RFuture<RedisConnection> connectionFuture, final RPromise<R> attemptPromise) {
        if (this.retryInterval == 0L || this.attempts == 0) {
            this.timeout = MasterSlaveConnectionManager.DUMMY_TIMEOUT;
            return;
        }
        TimerTask retryTimerTask = new TimerTask(){

            @Override
            public void run(Timeout t) throws Exception {
                if (attemptPromise.isDone()) {
                    return;
                }
                if (connectionFuture.cancel(false)) {
                    if (RedisExecutor.this.exception == null) {
                        RedisExecutor.this.exception = new RedisTimeoutException("Unable to acquire connection! Increase connection pool size and/or retryIntervalNode source: " + RedisExecutor.this.source + ", command: " + LogHelper.toString(RedisExecutor.this.command, RedisExecutor.this.params) + " after " + RedisExecutor.this.attempt + " retry attempts");
                    }
                } else if (connectionFuture.isSuccess()) {
                    if (RedisExecutor.this.writeFuture == null || !RedisExecutor.this.writeFuture.isDone()) {
                        if (RedisExecutor.this.attempt == RedisExecutor.this.attempts) {
                            if (RedisExecutor.this.writeFuture != null && RedisExecutor.this.writeFuture.cancel(false)) {
                                if (RedisExecutor.this.exception == null) {
                                    RedisExecutor.this.exception = new RedisTimeoutException("Command still hasn't been written into connection! Increase nettyThreads and/or retryIntervalNode source: " + RedisExecutor.this.source + ", connection: " + connectionFuture.getNow() + ", command: " + LogHelper.toString(RedisExecutor.this.command, RedisExecutor.this.params) + " after " + RedisExecutor.this.attempt + " retry attempts");
                                }
                                attemptPromise.tryFailure(RedisExecutor.this.exception);
                            }
                            return;
                        }
                        ++RedisExecutor.this.attempt;
                        RedisExecutor.this.scheduleRetryTimeout(connectionFuture, attemptPromise);
                        return;
                    }
                    if (RedisExecutor.this.writeFuture.isSuccess()) {
                        return;
                    }
                }
                if (RedisExecutor.this.mainPromise.isCancelled()) {
                    if (attemptPromise.cancel(false)) {
                        RedisExecutor.this.free();
                    }
                    return;
                }
                if (RedisExecutor.this.attempt == RedisExecutor.this.attempts) {
                    attemptPromise.tryFailure(RedisExecutor.this.exception);
                    return;
                }
                if (!attemptPromise.cancel(false)) {
                    return;
                }
                ++RedisExecutor.this.attempt;
                if (log.isDebugEnabled()) {
                    log.debug("attempt {} for command {} and params {}", RedisExecutor.this.attempt, RedisExecutor.this.command, LogHelper.toString(RedisExecutor.this.params));
                }
                RedisExecutor.this.mainPromiseListener = null;
                RedisExecutor.this.execute();
            }
        };
        this.timeout = this.connectionManager.newTimeout(retryTimerTask, this.retryInterval, TimeUnit.MILLISECONDS);
    }

    protected void free() {
        this.free(this.params);
    }

    protected void free(Object[] params) {
        for (Object obj : params) {
            ReferenceCountUtil.safeRelease(obj);
        }
    }

    private void checkWriteFuture(ChannelFuture future, RPromise<R> attemptPromise, RedisConnection connection) {
        if (future.isCancelled() || attemptPromise.isDone()) {
            return;
        }
        if (!future.isSuccess()) {
            this.exception = new WriteRedisConnectionException("Unable to write command into connection! Node source: " + this.source + ", connection: " + connection + ", command: " + LogHelper.toString(this.command, this.params) + " after " + this.attempt + " retry attempts", future.cause());
            if (this.attempt == this.attempts) {
                attemptPromise.tryFailure(this.exception);
            }
            return;
        }
        this.timeout.cancel();
        this.scheduleResponseTimeout(attemptPromise, connection);
    }

    private void scheduleResponseTimeout(final RPromise<R> attemptPromise, final RedisConnection connection) {
        long timeoutTime = this.responseTimeout;
        if (this.command != null && (RedisCommands.BLOCKING_COMMAND_NAMES.contains(this.command.getName()) || RedisCommands.BLOCKING_COMMANDS.contains(this.command))) {
            Long popTimeout = null;
            if (RedisCommands.BLOCKING_COMMANDS.contains(this.command)) {
                boolean found = false;
                for (Object param : this.params) {
                    if (found) {
                        popTimeout = Long.valueOf(param.toString()) / 1000L;
                        break;
                    }
                    if (!"BLOCK".equals(param)) continue;
                    found = true;
                }
            } else {
                popTimeout = Long.valueOf(this.params[this.params.length - 1].toString());
            }
            this.handleBlockingOperations(attemptPromise, connection, popTimeout);
            if (popTimeout == 0L) {
                return;
            }
            timeoutTime += popTimeout * 1000L;
            timeoutTime += 1000L;
        }
        final long timeoutAmount = timeoutTime;
        TimerTask timeoutTask = new TimerTask(){

            @Override
            public void run(Timeout timeout) throws Exception {
                if (RedisExecutor.this.attempt < RedisExecutor.this.attempts) {
                    if (!attemptPromise.cancel(false)) {
                        return;
                    }
                    ++RedisExecutor.this.attempt;
                    if (log.isDebugEnabled()) {
                        log.debug("attempt {} for command {} and params {}", RedisExecutor.this.attempt, RedisExecutor.this.command, LogHelper.toString(RedisExecutor.this.params));
                    }
                    RedisExecutor.this.mainPromiseListener = null;
                    RedisExecutor.this.execute();
                    return;
                }
                attemptPromise.tryFailure(new RedisResponseTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured after " + RedisExecutor.this.attempt + " retry attempts. Command: " + LogHelper.toString(RedisExecutor.this.command, RedisExecutor.this.params) + ", channel: " + connection.getChannel()));
            }
        };
        this.timeout = this.connectionManager.newTimeout(timeoutTask, timeoutTime, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleBlockingOperations(final RPromise<R> attemptPromise, final RedisConnection connection, Long popTimeout) {
        FutureListener listener = f -> this.mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown"));
        Timeout scheduledFuture = popTimeout != 0L ? this.connectionManager.newTimeout(new TimerTask(){

            @Override
            public void run(Timeout timeout) throws Exception {
                if (attemptPromise.trySuccess(null)) {
                    connection.forceFastReconnectAsync();
                }
            }
        }, popTimeout, TimeUnit.SECONDS) : null;
        this.mainPromise.onComplete((res, e) -> {
            if (scheduledFuture != null) {
                scheduledFuture.cancel();
            }
            FutureListener futureListener = listener;
            synchronized (futureListener) {
                this.connectionManager.getShutdownPromise().removeListener(listener);
            }
            if (this.mainPromise.isCancelled() && !attemptPromise.isDone()) {
                log.debug("Canceled blocking operation {} used {}", (Object)this.command, (Object)connection);
                connection.forceFastReconnectAsync().onComplete((r, ex) -> attemptPromise.cancel(true));
                return;
            }
            if (e instanceof RedissonShutdownException) {
                attemptPromise.tryFailure((Throwable)e);
            }
        });
        FutureListener futureListener = listener;
        synchronized (futureListener) {
            if (!this.mainPromise.isDone()) {
                this.connectionManager.getShutdownPromise().addListener(listener);
            }
        }
    }

    protected void checkAttemptPromise(RPromise<R> attemptFuture, RFuture<RedisConnection> connectionFuture) {
        this.timeout.cancel();
        if (attemptFuture.isCancelled()) {
            return;
        }
        try {
            this.mainPromiseListener = null;
            if (attemptFuture.cause() instanceof RedisMovedException && !this.ignoreRedirect) {
                RedisMovedException ex = (RedisMovedException)attemptFuture.cause();
                if (this.source.getRedirect() == NodeSource.Redirect.MOVED) {
                    this.mainPromise.tryFailure(new RedisException("MOVED redirection loop detected. Node " + this.source.getAddr() + " has further redirect to " + ex.getUrl()));
                    return;
                }
                this.onException();
                this.source = new NodeSource(ex.getSlot(), this.connectionManager.applyNatMap(ex.getUrl()), NodeSource.Redirect.MOVED);
                this.execute();
                return;
            }
            if (attemptFuture.cause() instanceof RedisAskException && !this.ignoreRedirect) {
                RedisAskException ex = (RedisAskException)attemptFuture.cause();
                this.onException();
                this.source = new NodeSource(ex.getSlot(), this.connectionManager.applyNatMap(ex.getUrl()), NodeSource.Redirect.ASK);
                this.execute();
                return;
            }
            if ((attemptFuture.cause() instanceof RedisLoadingException || attemptFuture.cause() instanceof RedisTryAgainException) && this.attempt < this.attempts) {
                this.onException();
                this.connectionManager.newTimeout(new TimerTask(){

                    @Override
                    public void run(Timeout timeout) throws Exception {
                        ++RedisExecutor.this.attempt;
                        RedisExecutor.this.execute();
                    }
                }, Math.min(this.responseTimeout, 1000L), TimeUnit.MILLISECONDS);
                return;
            }
            this.free();
            this.handleResult(attemptFuture, connectionFuture);
        }
        catch (Exception e) {
            this.handleError(connectionFuture, e);
        }
    }

    protected void handleResult(RPromise<R> attemptPromise, RFuture<RedisConnection> connectionFuture) throws ReflectiveOperationException {
        if (attemptPromise.isSuccess()) {
            Object res = attemptPromise.getNow();
            if (res instanceof ScanResult) {
                ((ScanResult)res).setRedisClient(connectionFuture.getNow().getRedisClient());
            }
            this.handleSuccess(this.mainPromise, connectionFuture, res);
        } else {
            this.handleError(connectionFuture, attemptPromise.cause());
        }
    }

    protected void onException() {
    }

    protected void handleError(RFuture<RedisConnection> connectionFuture, Throwable cause) {
        this.mainPromise.tryFailure(cause);
    }

    protected void handleSuccess(RPromise<R> promise, RFuture<RedisConnection> connectionFuture, R res) throws ReflectiveOperationException {
        if (this.objectBuilder != null) {
            this.handleReference(promise, res);
        } else {
            promise.trySuccess(res);
        }
    }

    private void handleReference(RPromise<R> promise, R res) throws ReflectiveOperationException {
        promise.trySuccess(RedisExecutor.tryHandleReference(this.objectBuilder, res));
    }

    public static Object tryHandleReference(RedissonObjectBuilder objectBuilder, Object o) throws ReflectiveOperationException {
        boolean hasConversion = false;
        if (o instanceof List) {
            List r = (List)o;
            for (int i = 0; i < r.size(); ++i) {
                Object ref = RedisExecutor.tryHandleReference0(objectBuilder, r.get(i));
                if (ref == r.get(i)) continue;
                r.set(i, ref);
            }
            return o;
        }
        if (o instanceof Set) {
            Set<Object> set = (LinkedHashSet)o;
            Set r = (Set)o;
            boolean useNewSet = o instanceof LinkedHashSet;
            try {
                set = (Set)o.getClass().getConstructor(new Class[0]).newInstance(new Object[0]);
            }
            catch (Exception exception) {
                set = new LinkedHashSet();
            }
            for (Object i : r) {
                Object ref = RedisExecutor.tryHandleReference0(objectBuilder, i);
                if (useNewSet) {
                    set.add(ref);
                } else {
                    try {
                        r.add(ref);
                        set.add(i);
                    }
                    catch (Exception e) {
                        useNewSet = true;
                        set.add(ref);
                    }
                }
                hasConversion |= ref != i;
            }
            if (!hasConversion) {
                return o;
            }
            if (useNewSet) {
                return set;
            }
            if (!set.isEmpty()) {
                r.removeAll(set);
            }
            return o;
        }
        if (o instanceof Map) {
            Map r = (Map)o;
            for (Map.Entry e : r.entrySet()) {
                if (!(e.getKey() instanceof RedissonReference) && !(e.getValue() instanceof RedissonReference)) continue;
                Object key = e.getKey();
                Object value = e.getValue();
                if (e.getKey() instanceof RedissonReference) {
                    key = RedisExecutor.fromReference(objectBuilder, e.getKey());
                    r.remove(e.getKey());
                }
                if (e.getValue() instanceof RedissonReference) {
                    value = RedisExecutor.fromReference(objectBuilder, e.getValue());
                }
                r.put(key, value);
            }
            return o;
        }
        if (o instanceof ListScanResult) {
            RedisExecutor.tryHandleReference(objectBuilder, ((ListScanResult)o).getValues());
            return o;
        }
        if (o instanceof MapScanResult) {
            MapScanResult scanResult = (MapScanResult)o;
            Map oldMap = ((MapScanResult)o).getMap();
            Map map = (Map)RedisExecutor.tryHandleReference(objectBuilder, oldMap);
            if (map != oldMap) {
                MapScanResult newScanResult = new MapScanResult(scanResult.getPos(), map);
                newScanResult.setRedisClient(scanResult.getRedisClient());
                return newScanResult;
            }
            return o;
        }
        return RedisExecutor.tryHandleReference0(objectBuilder, o);
    }

    private static Object tryHandleReference0(RedissonObjectBuilder objectBuilder, Object o) throws ReflectiveOperationException {
        if (o instanceof RedissonReference) {
            return RedisExecutor.fromReference(objectBuilder, o);
        }
        if (o instanceof ScoredEntry && ((ScoredEntry)o).getValue() instanceof RedissonReference) {
            ScoredEntry se = (ScoredEntry)o;
            return new ScoredEntry<Object>(se.getScore(), RedisExecutor.fromReference(objectBuilder, se.getValue()));
        }
        if (o instanceof Map.Entry) {
            Map.Entry old = (Map.Entry)o;
            Object key = RedisExecutor.tryHandleReference0(objectBuilder, old.getKey());
            Object value = RedisExecutor.tryHandleReference0(objectBuilder, old.getValue());
            if (value != old.getValue() || key != old.getKey()) {
                return new AbstractMap.SimpleEntry<Object, Object>(key, value);
            }
        }
        return o;
    }

    private static Object fromReference(RedissonObjectBuilder objectBuilder, Object res) throws ReflectiveOperationException {
        if (objectBuilder == null) {
            return res;
        }
        return objectBuilder.fromReference((RedissonReference)res);
    }

    protected void sendCommand(RPromise<R> attemptPromise, RedisConnection connection) {
        if (this.source.getRedirect() == NodeSource.Redirect.ASK) {
            ArrayList list = new ArrayList(2);
            RedissonPromise promise = new RedissonPromise();
            list.add(new CommandData(promise, this.codec, RedisCommands.ASKING, new Object[0]));
            list.add(new CommandData<V, R>(attemptPromise, this.codec, this.command, this.params));
            RedissonPromise<Void> main = new RedissonPromise<Void>();
            this.writeFuture = connection.send(new CommandsData(main, list, false));
        } else {
            if (log.isDebugEnabled()) {
                log.debug("acquired connection for command {} and params {} from slot {} using node {}... {}", this.command, LogHelper.toString(this.params), this.source, connection.getRedisClient().getAddr(), connection);
            }
            this.writeFuture = connection.send(new CommandData<V, R>(attemptPromise, this.codec, this.command, this.params));
        }
    }

    protected void releaseConnection(RPromise<R> attemptPromise, RFuture<RedisConnection> connectionFuture) {
        attemptPromise.onComplete((res, e) -> {
            if (!connectionFuture.isSuccess()) {
                return;
            }
            RedisConnection connection = (RedisConnection)connectionFuture.getNow();
            this.connectionManager.getShutdownLatch().release();
            if (this.readOnlyMode) {
                this.connectionManager.releaseRead(this.source, connection);
            } else {
                this.connectionManager.releaseWrite(this.source, connection);
            }
            if (log.isDebugEnabled()) {
                log.debug("connection released for command {} and params {} from slot {} using connection {}", this.command, LogHelper.toString(this.params), this.source, connection);
            }
        });
    }

    protected RFuture<RedisConnection> getConnection() {
        RFuture<RedisConnection> connectionFuture = this.readOnlyMode ? this.connectionManager.connectionReadOp(this.source, this.command) : this.connectionManager.connectionWriteOp(this.source, this.command);
        return connectionFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Codec getCodec(Codec codec) {
        if (codec == null) {
            return codec;
        }
        for (Class<?> clazz : BaseCodec.SKIPPED_CODECS) {
            if (!clazz.isAssignableFrom(codec.getClass())) continue;
            return codec;
        }
        Codec codecToUse = codec;
        ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
        if (threadClassLoader != null) {
            Map<Codec, Codec> map = CODECS.get(threadClassLoader);
            if (map == null) {
                Map<ClassLoader, Map<Codec, Codec>> map2 = CODECS;
                synchronized (map2) {
                    map = CODECS.get(threadClassLoader);
                    if (map == null) {
                        map = new ConcurrentHashMap<Codec, Codec>();
                        CODECS.put(threadClassLoader, map);
                    }
                }
            }
            if ((codecToUse = map.get(codec)) == null) {
                try {
                    codecToUse = (Codec)codec.getClass().getConstructor(ClassLoader.class, codec.getClass()).newInstance(threadClassLoader, codec);
                }
                catch (NoSuchMethodException e) {
                    codecToUse = codec;
                }
                catch (Exception e) {
                    throw new IllegalStateException(e);
                }
                map.put(codec, codecToUse);
            }
        }
        return codecToUse;
    }

    protected <T> RedisException convertException(RFuture<T> future) {
        if (future.cause() instanceof RedisException) {
            return (RedisException)future.cause();
        }
        return new RedisException("Unexpected exception while processing command", future.cause());
    }
}

