/*
 * Decompiled with CFR 0.152.
 */
package org.redisson;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.lang.invoke.SerializedLambda;
import java.lang.ref.ReferenceQueue;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.QueueTransferService;
import org.redisson.QueueTransferTask;
import org.redisson.Redisson;
import org.redisson.RedissonRemoteService;
import org.redisson.RedissonTopic;
import org.redisson.api.CronSchedule;
import org.redisson.api.ExecutorOptions;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RExecutorBatchFuture;
import org.redisson.api.RExecutorFuture;
import org.redisson.api.RFuture;
import org.redisson.api.RRemoteService;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RScheduledFuture;
import org.redisson.api.RSemaphore;
import org.redisson.api.RTopic;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.WorkerOptions;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.executor.RedissonExecutorBatchFuture;
import org.redisson.executor.RedissonExecutorFuture;
import org.redisson.executor.RedissonExecutorFutureReference;
import org.redisson.executor.RedissonExecutorRemoteService;
import org.redisson.executor.RedissonScheduledFuture;
import org.redisson.executor.RemoteExecutorService;
import org.redisson.executor.RemoteExecutorServiceAsync;
import org.redisson.executor.RemotePromise;
import org.redisson.executor.ScheduledTasksService;
import org.redisson.executor.TasksBatchService;
import org.redisson.executor.TasksRunnerService;
import org.redisson.executor.TasksService;
import org.redisson.executor.params.ScheduledAtFixedRateParameters;
import org.redisson.executor.params.ScheduledCronExpressionParameters;
import org.redisson.executor.params.ScheduledParameters;
import org.redisson.executor.params.ScheduledWithFixedDelayParameters;
import org.redisson.executor.params.TaskParameters;
import org.redisson.misc.Injector;
import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedissonExecutorService
implements RScheduledExecutorService {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedissonExecutorService.class);
    private static final RemoteInvocationOptions RESULT_OPTIONS = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1L, TimeUnit.HOURS);
    public static final int SHUTDOWN_STATE = 1;
    public static final int TERMINATED_STATE = 2;
    private final CommandExecutor commandExecutor;
    private final ConnectionManager connectionManager;
    private final Codec codec;
    private final Redisson redisson;
    private final String tasksName;
    private final String schedulerQueueName;
    private final String schedulerChannelName;
    private final String tasksRetryIntervalName;
    private final String workersChannelName;
    private final String workersSemaphoreName;
    private final String workersCounterName;
    private final String tasksCounterName;
    private final String statusName;
    private final RTopic terminationTopic;
    private final RRemoteService remoteService;
    private final RTopic workersTopic;
    private int workersGroupListenerId;
    private final RemoteExecutorServiceAsync asyncScheduledService;
    private final RemoteExecutorServiceAsync asyncScheduledServiceAtFixed;
    private final RemoteExecutorServiceAsync asyncService;
    private final RemoteExecutorServiceAsync asyncServiceWithoutResult;
    private final ScheduledTasksService scheduledRemoteService;
    private final TasksService executorRemoteService;
    private final Map<Class<?>, ClassBody> class2body = new ConcurrentHashMap();
    private final String name;
    private final String requestQueueName;
    private final String responseQueueName;
    private final QueueTransferService queueTransferService;
    private final String executorId;
    private final ConcurrentMap<String, ResponseEntry> responses;
    private final ReferenceQueue<RExecutorFuture<?>> referenceDueue = new ReferenceQueue();
    private final Collection<RedissonExecutorFutureReference> references = Collections.newSetFromMap(new ConcurrentHashMap());

    public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name, QueueTransferService queueTransferService, ConcurrentMap<String, ResponseEntry> responses, ExecutorOptions options) {
        this.codec = codec;
        this.commandExecutor = commandExecutor;
        this.connectionManager = commandExecutor.getConnectionManager();
        this.name = name;
        this.redisson = redisson;
        this.queueTransferService = queueTransferService;
        this.responses = responses;
        this.executorId = codec == this.connectionManager.getCodec() ? this.connectionManager.getId() : this.connectionManager.getId() + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name;
        this.remoteService = new RedissonExecutorRemoteService(codec, name, this.connectionManager.getCommandExecutor(), this.executorId, responses);
        this.requestQueueName = ((RedissonRemoteService)this.remoteService).getRequestQueueName(RemoteExecutorService.class);
        this.responseQueueName = ((RedissonRemoteService)this.remoteService).getResponseQueueName(this.executorId);
        String objectName = this.requestQueueName;
        this.tasksCounterName = objectName + ":counter";
        this.tasksName = objectName + ":tasks";
        this.statusName = objectName + ":status";
        this.terminationTopic = redisson.getTopic(objectName + ":termination-topic", LongCodec.INSTANCE);
        this.tasksRetryIntervalName = objectName + ":retry-interval";
        this.schedulerChannelName = objectName + ":scheduler-channel";
        this.schedulerQueueName = objectName + ":scheduler";
        this.workersChannelName = objectName + ":workers-channel";
        this.workersSemaphoreName = objectName + ":workers-semaphore";
        this.workersCounterName = objectName + ":workers-counter";
        this.workersTopic = redisson.getTopic(this.workersChannelName);
        this.executorRemoteService = new TasksService(codec, name, commandExecutor, this.executorId, responses);
        this.executorRemoteService.setTerminationTopicName(this.terminationTopic.getChannelNames().get(0));
        this.executorRemoteService.setTasksCounterName(this.tasksCounterName);
        this.executorRemoteService.setStatusName(this.statusName);
        this.executorRemoteService.setTasksName(this.tasksName);
        this.executorRemoteService.setSchedulerChannelName(this.schedulerChannelName);
        this.executorRemoteService.setSchedulerQueueName(this.schedulerQueueName);
        this.executorRemoteService.setTasksRetryIntervalName(this.tasksRetryIntervalName);
        this.executorRemoteService.setTasksRetryInterval(options.getTaskRetryInterval());
        this.asyncService = this.executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
        this.asyncServiceWithoutResult = this.executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
        this.scheduledRemoteService = new ScheduledTasksService(codec, name, commandExecutor, this.executorId, responses);
        this.scheduledRemoteService.setTerminationTopicName(this.terminationTopic.getChannelNames().get(0));
        this.scheduledRemoteService.setTasksCounterName(this.tasksCounterName);
        this.scheduledRemoteService.setStatusName(this.statusName);
        this.scheduledRemoteService.setSchedulerQueueName(this.schedulerQueueName);
        this.scheduledRemoteService.setSchedulerChannelName(this.schedulerChannelName);
        this.scheduledRemoteService.setTasksName(this.tasksName);
        this.scheduledRemoteService.setTasksRetryIntervalName(this.tasksRetryIntervalName);
        this.scheduledRemoteService.setTasksRetryInterval(options.getTaskRetryInterval());
        this.asyncScheduledService = this.scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
        this.asyncScheduledServiceAtFixed = this.scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
    }

    protected String generateRequestId() {
        byte[] id = new byte[16];
        ThreadLocalRandom.current().nextBytes(id);
        return ByteBufUtil.hexDump(id);
    }

    @Override
    public int getTaskCount() {
        return this.commandExecutor.get(this.getTaskCountAsync());
    }

    @Override
    public RFuture<Integer> getTaskCountAsync() {
        return this.commandExecutor.readAsync(this.getName(), (Codec)LongCodec.INSTANCE, RedisCommands.GET_INTEGER, this.tasksCounterName);
    }

    @Override
    public boolean hasTask(String taskId) {
        return this.commandExecutor.get(this.hasTaskAsync(taskId));
    }

    @Override
    public Set<String> getTaskIds() {
        return this.commandExecutor.get(this.getTaskIdsAsync());
    }

    @Override
    public RFuture<Set<String>> getTaskIdsAsync() {
        return this.commandExecutor.writeAsync(this.tasksName, (Codec)StringCodec.INSTANCE, RedisCommands.HKEYS, this.tasksName);
    }

    @Override
    public RFuture<Boolean> hasTaskAsync(String taskId) {
        return this.commandExecutor.writeAsync(this.tasksName, (Codec)LongCodec.INSTANCE, RedisCommands.HEXISTS, this.tasksName, taskId);
    }

    @Override
    public int countActiveWorkers() {
        String id = this.generateRequestId();
        int subscribers = (int)this.workersTopic.publish(id);
        if (subscribers == 0) {
            return 0;
        }
        RSemaphore semaphore = this.redisson.getSemaphore(this.workersSemaphoreName + ":" + id);
        try {
            semaphore.tryAcquire(subscribers, 10L, TimeUnit.MINUTES);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        RAtomicLong atomicLong = this.redisson.getAtomicLong(this.workersCounterName + ":" + id);
        long result = atomicLong.get();
        this.redisson.getKeys().delete(semaphore, atomicLong);
        return (int)result;
    }

    @Override
    public void registerWorkers(int workers) {
        this.registerWorkers(WorkerOptions.defaults().workers(workers));
    }

    @Override
    public void registerWorkers(final WorkerOptions options) {
        if (options.getWorkers() == 0) {
            throw new IllegalArgumentException("workers amount can't be zero");
        }
        QueueTransferTask task = new QueueTransferTask(this.connectionManager){

            @Override
            protected RTopic getTopic() {
                return new RedissonTopic(LongCodec.INSTANCE, RedissonExecutorService.this.commandExecutor, RedissonExecutorService.this.schedulerChannelName);
            }

            @Override
            protected RFuture<Long> pushTaskAsync() {
                return RedissonExecutorService.this.commandExecutor.evalWriteAsync(RedissonExecutorService.this.name, (Codec)LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredTaskIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); local retryInterval = redis.call('get', KEYS[4]);if #expiredTaskIds > 0 then redis.call('zrem', KEYS[2], unpack(expiredTaskIds));if retryInterval ~= false then local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);for i = 1, #expiredTaskIds, 1 do local name = expiredTaskIds[i];local scheduledName = expiredTaskIds[i];if string.sub(scheduledName, 1, 2) ~= 'ff' then scheduledName = 'ff' .. scheduledName; else name = string.sub(name, 3, string.len(name)); end;redis.call('zadd', KEYS[2], startTime, scheduledName);local v = redis.call('zrange', KEYS[2], 0, 0); if v[1] == expiredTaskIds[i] then redis.call('publish', KEYS[3], startTime); end;if redis.call('linsert', KEYS[1], 'before', name, name) < 1 then redis.call('rpush', KEYS[1], name); else redis.call('lrem', KEYS[1], -1, name); end; end; else redis.call('rpush', KEYS[1], unpack(expiredTaskIds));end; end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;", Arrays.asList(RedissonExecutorService.this.requestQueueName, RedissonExecutorService.this.schedulerQueueName, RedissonExecutorService.this.schedulerChannelName, RedissonExecutorService.this.tasksRetryIntervalName), System.currentTimeMillis(), 50);
            }
        };
        this.queueTransferService.schedule(this.getName(), task);
        TasksRunnerService service = new TasksRunnerService(this.commandExecutor, this.redisson, this.codec, this.requestQueueName, this.responses);
        service.setStatusName(this.statusName);
        service.setTasksCounterName(this.tasksCounterName);
        service.setTasksName(this.tasksName);
        service.setTerminationTopicName(this.terminationTopic.getChannelNames().get(0));
        service.setSchedulerChannelName(this.schedulerChannelName);
        service.setSchedulerQueueName(this.schedulerQueueName);
        service.setTasksRetryIntervalName(this.tasksRetryIntervalName);
        service.setBeanFactory(options.getBeanFactory());
        ExecutorService es = this.commandExecutor.getConnectionManager().getExecutor();
        if (options.getExecutorService() != null) {
            es = options.getExecutorService();
        }
        this.remoteService.register(RemoteExecutorService.class, service, options.getWorkers(), es);
        this.workersGroupListenerId = this.workersTopic.addListener(String.class, new MessageListener<String>(){

            @Override
            public void onMessage(CharSequence channel, String id) {
                RedissonExecutorService.this.redisson.getAtomicLong(RedissonExecutorService.this.workersCounterName + ":" + id).getAndAdd(options.getWorkers());
                RedissonExecutorService.this.redisson.getSemaphore(RedissonExecutorService.this.workersSemaphoreName + ":" + id).release();
            }
        });
    }

    @Override
    public void registerWorkers(int workers, ExecutorService executor) {
        this.registerWorkers(WorkerOptions.defaults().workers(workers).executorService(executor));
    }

    @Override
    public void execute(Runnable task) {
        this.check(task);
        RemotePromise promise = (RemotePromise)this.asyncServiceWithoutResult.executeRunnable(this.createTaskParameters(task));
        this.syncExecute(promise);
    }

    @Override
    public void execute(Runnable ... tasks) {
        if (tasks.length == 0) {
            throw new NullPointerException("Tasks are not defined");
        }
        TasksBatchService executorRemoteService = this.createBatchService();
        RemoteExecutorServiceAsync asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
        for (Runnable task : tasks) {
            this.check(task);
            asyncServiceWithoutResult.executeRunnable(this.createTaskParameters(task));
        }
        List<Boolean> result = executorRemoteService.executeAdd();
        if (!result.get(0).booleanValue()) {
            throw new RejectedExecutionException("Tasks have been rejected. ExecutorService is in shutdown state");
        }
    }

    private TasksBatchService createBatchService() {
        TasksBatchService executorRemoteService = new TasksBatchService(this.codec, this.name, this.commandExecutor, this.executorId, this.responses);
        executorRemoteService.setTerminationTopicName(this.terminationTopic.getChannelNames().get(0));
        executorRemoteService.setTasksCounterName(this.tasksCounterName);
        executorRemoteService.setStatusName(this.statusName);
        executorRemoteService.setTasksName(this.tasksName);
        executorRemoteService.setSchedulerChannelName(this.schedulerChannelName);
        executorRemoteService.setSchedulerQueueName(this.schedulerQueueName);
        executorRemoteService.setTasksRetryIntervalName(this.tasksRetryIntervalName);
        return executorRemoteService;
    }

    private byte[] encode(Object task) {
        Injector.inject(task, null);
        ByteBuf buf = null;
        try {
            buf = this.codec.getValueEncoder().encode(task);
            byte[] dst = new byte[buf.readableBytes()];
            buf.readBytes(dst);
            byte[] byArray = dst;
            return byArray;
        }
        catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
        finally {
            if (buf != null) {
                buf.release();
            }
        }
    }

    private ClassBody getClassBody(Object task) {
        Class<?> c = task.getClass();
        ClassBody result = this.class2body.get(c);
        if (result == null) {
            byte[] classBody;
            String className = c.getName();
            String classAsPath = className.replace('.', '/') + ".class";
            InputStream classStream = c.getClassLoader().getResourceAsStream(classAsPath);
            byte[] lambdaBody = null;
            if (classStream == null) {
                SerializedLambda lambda;
                ByteArrayOutputStream os = new ByteArrayOutputStream();
                try {
                    ObjectOutputStream oo = new ObjectOutputStream(os);
                    oo.writeObject(task);
                }
                catch (Exception e) {
                    throw new IllegalArgumentException("Unable to serialize lambda", e);
                }
                lambdaBody = os.toByteArray();
                try {
                    Method writeReplace = task.getClass().getDeclaredMethod("writeReplace", new Class[0]);
                    writeReplace.setAccessible(true);
                    lambda = (SerializedLambda)writeReplace.invoke(task, new Object[0]);
                }
                catch (Exception ex) {
                    throw new IllegalArgumentException("Lambda should implement java.io.Serializable interface", ex);
                }
                className = lambda.getCapturingClass().replace('/', '.');
                classStream = task.getClass().getClassLoader().getResourceAsStream(lambda.getCapturingClass() + ".class");
            }
            try {
                DataInputStream s = new DataInputStream(classStream);
                classBody = new byte[s.available()];
                s.readFully(classBody);
            }
            catch (IOException e) {
                throw new IllegalArgumentException(e);
            }
            finally {
                try {
                    classStream.close();
                }
                catch (IOException iOException) {}
            }
            result = new ClassBody(lambdaBody, classBody, className);
            this.class2body.put(c, result);
        }
        return result;
    }

    @Override
    public void shutdown() {
        this.queueTransferService.remove(this.getName());
        this.remoteService.deregister(RemoteExecutorService.class);
        this.workersTopic.removeListener(this.workersGroupListenerId);
        this.commandExecutor.evalWrite(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, "if redis.call('exists', KEYS[2]) == 0 then if redis.call('get', KEYS[1]) == '0' or redis.call('exists', KEYS[1]) == 0 then redis.call('set', KEYS[2], ARGV[2]);redis.call('publish', KEYS[3], ARGV[2]);else redis.call('set', KEYS[2], ARGV[1]);end;end;", Arrays.asList(this.tasksCounterName, this.statusName, this.terminationTopic.getChannelNames().get(0), this.tasksRetryIntervalName), 1, 2);
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public boolean delete() {
        return this.commandExecutor.get(this.deleteAsync());
    }

    @Override
    public RFuture<Boolean> deleteAsync() {
        RedissonPromise<Boolean> result = new RedissonPromise<Boolean>();
        RFuture<Long> deleteFuture = this.redisson.getKeys().deleteAsync(this.requestQueueName, this.statusName, this.tasksCounterName, this.schedulerQueueName, this.tasksName, this.tasksRetryIntervalName);
        deleteFuture.onComplete((res, e) -> {
            if (e != null) {
                result.tryFailure((Throwable)e);
                return;
            }
            result.trySuccess(res > 0L);
        });
        return result;
    }

    @Override
    public List<Runnable> shutdownNow() {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean isShutdown() {
        return this.checkState(1);
    }

    private boolean checkState(int state) {
        return (Boolean)this.commandExecutor.evalWrite(this.getName(), this.codec, RedisCommands.EVAL_BOOLEAN, "if redis.call('exists', KEYS[1]) == 1 and tonumber(redis.call('get', KEYS[1])) >= tonumber(ARGV[1]) then return 1;end;return 0;", Arrays.asList(this.statusName), state);
    }

    @Override
    public boolean isTerminated() {
        return this.checkState(2);
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        if (this.isTerminated()) {
            return true;
        }
        final CountDownLatch latch = new CountDownLatch(1);
        MessageListener<Long> listener = new MessageListener<Long>(){

            @Override
            public void onMessage(CharSequence channel, Long msg) {
                if (msg == 2L) {
                    latch.countDown();
                }
            }
        };
        int listenerId = this.terminationTopic.addListener(Long.class, listener);
        if (this.isTerminated()) {
            this.terminationTopic.removeListener(listenerId);
            return true;
        }
        boolean res = latch.await(timeout, unit);
        this.terminationTopic.removeListener(listenerId);
        return res;
    }

    @Override
    public <T> RExecutorFuture<T> submit(Callable<T> task) {
        RemotePromise promise = (RemotePromise)((PromiseDelegator)((Object)this.submitAsync(task))).getInnerPromise();
        this.syncExecute(promise);
        return this.createFuture(promise);
    }

    @Override
    public <T> RExecutorFuture<T> submitAsync(Callable<T> task) {
        this.check(task);
        RemotePromise result = (RemotePromise)this.asyncService.executeCallable(this.createTaskParameters(task));
        this.addListener(result);
        return this.createFuture(result);
    }

    @Override
    public RExecutorBatchFuture submit(Callable<?> ... tasks) {
        if (tasks.length == 0) {
            throw new NullPointerException("Tasks are not defined");
        }
        ArrayList result = new ArrayList();
        TasksBatchService executorRemoteService = this.createBatchService();
        RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
        for (Callable<?> task : tasks) {
            this.check(task);
            RemotePromise promise = (RemotePromise)asyncService.executeCallable(this.createTaskParameters(task));
            RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise);
            result.add(executorFuture);
        }
        List<Boolean> addResult = executorRemoteService.executeAdd();
        if (!addResult.get(0).booleanValue()) {
            throw new RejectedExecutionException("Tasks have been rejected. ExecutorService is in shutdown state");
        }
        return new RedissonExecutorBatchFuture(result);
    }

    protected TaskParameters createTaskParameters(Callable<?> task) {
        ClassBody classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        return new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state);
    }

    protected TaskParameters createTaskParameters(Runnable task) {
        ClassBody classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        return new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state);
    }

    @Override
    public RExecutorBatchFuture submitAsync(Callable<?> ... tasks) {
        if (tasks.length == 0) {
            throw new NullPointerException("Tasks are not defined");
        }
        TasksBatchService executorRemoteService = this.createBatchService();
        RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
        ArrayList result = new ArrayList();
        for (Callable<?> task : tasks) {
            this.check(task);
            RemotePromise promise = (RemotePromise)asyncService.executeCallable(this.createTaskParameters(task));
            RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise);
            result.add(executorFuture);
        }
        executorRemoteService.executeAddAsync().onComplete((res, e) -> {
            if (e != null) {
                for (RExecutorFuture executorFuture : result) {
                    ((RPromise)((Object)executorFuture)).tryFailure((Throwable)e);
                }
                return;
            }
            for (Boolean bool : res) {
                if (bool.booleanValue()) continue;
                RejectedExecutionException ex = new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state");
                for (RExecutorFuture executorFuture : result) {
                    ((RPromise)((Object)executorFuture)).tryFailure(ex);
                }
            }
        });
        return new RedissonExecutorBatchFuture(result);
    }

    private <T> void addListener(RemotePromise<T> result) {
        result.getAddFuture().onComplete((res, e) -> {
            if (e != null) {
                result.tryFailure((Throwable)e);
                return;
            }
            if (!res.booleanValue()) {
                result.tryFailure(new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state"));
            }
        });
    }

    private void check(Object task) {
        if (task == null) {
            throw new NullPointerException("Task is not defined");
        }
        if (task.getClass().isAnonymousClass()) {
            throw new IllegalArgumentException("Task can't be created using anonymous class");
        }
        if (task.getClass().isMemberClass() && !Modifier.isStatic(task.getClass().getModifiers())) {
            throw new IllegalArgumentException("Task class is an inner class and it should be static");
        }
    }

    private <T> void syncExecute(RemotePromise<T> promise) {
        RFuture<Boolean> addFuture = promise.getAddFuture();
        addFuture.syncUninterruptibly();
        Boolean res = addFuture.getNow();
        if (!res.booleanValue()) {
            throw new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state");
        }
    }

    @Override
    public <T> RExecutorFuture<T> submit(Runnable task, T result) {
        RedissonPromise resultFuture = new RedissonPromise();
        RemotePromise future = (RemotePromise)((PromiseDelegator)this.submit(task)).getInnerPromise();
        future.onComplete((res, e) -> {
            if (e != null) {
                resultFuture.tryFailure((Throwable)e);
                return;
            }
            resultFuture.trySuccess(result);
        });
        return new RedissonExecutorFuture(resultFuture, future.getRequestId());
    }

    @Override
    public RExecutorBatchFuture submit(Runnable ... tasks) {
        if (tasks.length == 0) {
            throw new NullPointerException("Tasks are not defined");
        }
        ArrayList result = new ArrayList();
        TasksBatchService executorRemoteService = this.createBatchService();
        RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
        for (Runnable task : tasks) {
            this.check(task);
            RemotePromise promise = (RemotePromise)asyncService.executeRunnable(this.createTaskParameters(task));
            RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise);
            result.add(executorFuture);
        }
        List<Boolean> addResult = executorRemoteService.executeAdd();
        if (!addResult.get(0).booleanValue()) {
            throw new RejectedExecutionException("Tasks have been rejected. ExecutorService is in shutdown state");
        }
        return new RedissonExecutorBatchFuture(result);
    }

    @Override
    public RExecutorBatchFuture submitAsync(Runnable ... tasks) {
        if (tasks.length == 0) {
            throw new NullPointerException("Tasks are not defined");
        }
        TasksBatchService executorRemoteService = this.createBatchService();
        RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
        ArrayList result = new ArrayList();
        for (Runnable task : tasks) {
            this.check(task);
            RemotePromise promise = (RemotePromise)asyncService.executeRunnable(this.createTaskParameters(task));
            RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise);
            result.add(executorFuture);
        }
        executorRemoteService.executeAddAsync().onComplete((res, e) -> {
            if (e != null) {
                for (RExecutorFuture executorFuture : result) {
                    ((RPromise)((Object)executorFuture)).tryFailure((Throwable)e);
                }
                return;
            }
            for (Boolean bool : res) {
                if (bool.booleanValue()) continue;
                RejectedExecutionException ex = new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state");
                for (RExecutorFuture executorFuture : result) {
                    ((RPromise)((Object)executorFuture)).tryFailure(ex);
                }
            }
        });
        return new RedissonExecutorBatchFuture(result);
    }

    @Override
    public RExecutorFuture<?> submit(Runnable task) {
        RemotePromise promise = (RemotePromise)((PromiseDelegator)((Object)this.submitAsync(task))).getInnerPromise();
        this.syncExecute(promise);
        return this.createFuture(promise);
    }

    @Override
    public RExecutorFuture<?> submitAsync(Runnable task) {
        this.check(task);
        RemotePromise result = (RemotePromise)this.asyncService.executeRunnable(this.createTaskParameters(task));
        this.addListener(result);
        return this.createFuture(result);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancelResponseHandling(RequestId requestId) {
        ConcurrentMap<String, ResponseEntry> concurrentMap = this.responses;
        synchronized (concurrentMap) {
            ResponseEntry entry = (ResponseEntry)this.responses.get(this.responseQueueName);
            if (entry == null) {
                return;
            }
            List<ResponseEntry.Result> list = entry.getResponses().remove(requestId);
            if (list != null) {
                for (ResponseEntry.Result result : list) {
                    result.getScheduledFuture().cancel(true);
                }
            }
            if (entry.getResponses().isEmpty()) {
                this.responses.remove(this.responseQueueName, entry);
            }
        }
    }

    @Override
    public RScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) {
        RedissonScheduledFuture future = (RedissonScheduledFuture)this.scheduleAsync(task, delay, unit);
        RemotePromise rp = (RemotePromise)future.getInnerPromise();
        this.syncExecute(rp);
        return future;
    }

    private <T> RExecutorFuture<T> createFuture(RemotePromise<T> promise) {
        RedissonExecutorFuture<T> f = new RedissonExecutorFuture<T>(promise);
        this.storeReference(f, promise.getRequestId());
        return f;
    }

    private <T> RScheduledFuture<T> createFuture(RemotePromise<T> promise, long scheduledExecutionTime) {
        RedissonScheduledFuture<T> f = new RedissonScheduledFuture<T>(promise, scheduledExecutionTime);
        this.storeReference(f, promise.getRequestId());
        return f;
    }

    private void storeReference(RExecutorFuture<?> future, RequestId requestId) {
        RedissonExecutorFutureReference r;
        while ((r = (RedissonExecutorFutureReference)this.referenceDueue.poll()) != null) {
            this.references.remove(r);
            if (r.getPromise().hasListeners()) continue;
            this.cancelResponseHandling(r.getRequestId());
        }
        RPromise promise = ((PromiseDelegator)((Object)future)).getInnerPromise();
        RedissonExecutorFutureReference reference = new RedissonExecutorFutureReference(requestId, future, this.referenceDueue, promise);
        this.references.add(reference);
    }

    @Override
    public RScheduledFuture<?> scheduleAsync(Runnable task, long delay, TimeUnit unit) {
        this.check(task);
        ClassBody classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        long startTime = System.currentTimeMillis() + unit.toMillis(delay);
        RemotePromise result = (RemotePromise)this.asyncScheduledService.scheduleRunnable(new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime));
        this.addListener(result);
        return this.createFuture(result, startTime);
    }

    @Override
    public <V> RScheduledFuture<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
        RedissonScheduledFuture future = (RedissonScheduledFuture)this.scheduleAsync(task, delay, unit);
        RemotePromise rp = (RemotePromise)future.getInnerPromise();
        this.syncExecute(rp);
        return future;
    }

    @Override
    public <V> RScheduledFuture<V> scheduleAsync(Callable<V> task, long delay, TimeUnit unit) {
        this.check(task);
        ClassBody classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        long startTime = System.currentTimeMillis() + unit.toMillis(delay);
        RemotePromise result = (RemotePromise)this.asyncScheduledService.scheduleCallable(new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime));
        this.addListener(result);
        return this.createFuture(result, startTime);
    }

    @Override
    public RScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
        RedissonScheduledFuture future = (RedissonScheduledFuture)this.scheduleAtFixedRateAsync(task, initialDelay, period, unit);
        RemotePromise rp = (RemotePromise)future.getInnerPromise();
        this.syncExecute(rp);
        return future;
    }

    @Override
    public RScheduledFuture<?> scheduleAtFixedRateAsync(Runnable task, long initialDelay, long period, TimeUnit unit) {
        this.check(task);
        ClassBody classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
        ScheduledAtFixedRateParameters params = new ScheduledAtFixedRateParameters();
        params.setClassName(classBody.getClazzName());
        params.setClassBody(classBody.getClazz());
        params.setLambdaBody(classBody.getLambda());
        params.setState(state);
        params.setStartTime(startTime);
        params.setPeriod(unit.toMillis(period));
        params.setExecutorId(this.executorId);
        RemotePromise result = (RemotePromise)this.asyncScheduledServiceAtFixed.scheduleAtFixedRate(params);
        this.addListener(result);
        return this.createFuture(result, startTime);
    }

    @Override
    public RScheduledFuture<?> schedule(Runnable task, CronSchedule cronSchedule) {
        RedissonScheduledFuture future = (RedissonScheduledFuture)this.scheduleAsync(task, cronSchedule);
        RemotePromise rp = (RemotePromise)future.getInnerPromise();
        this.syncExecute(rp);
        return future;
    }

    @Override
    public RScheduledFuture<?> scheduleAsync(Runnable task, CronSchedule cronSchedule) {
        this.check(task);
        ClassBody classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        final Date startDate = cronSchedule.getExpression().getNextValidTimeAfter(new Date());
        if (startDate == null) {
            throw new IllegalArgumentException("Wrong cron expression! Unable to calculate start date");
        }
        long startTime = startDate.getTime();
        ScheduledCronExpressionParameters params = new ScheduledCronExpressionParameters();
        params.setClassName(classBody.getClazzName());
        params.setClassBody(classBody.getClazz());
        params.setLambdaBody(classBody.getLambda());
        params.setState(state);
        params.setStartTime(startTime);
        params.setCronExpression(cronSchedule.getExpression().getCronExpression());
        params.setTimezone(cronSchedule.getExpression().getTimeZone().getID());
        params.setExecutorId(this.executorId);
        RemotePromise result = (RemotePromise)this.asyncScheduledServiceAtFixed.schedule(params);
        this.addListener(result);
        RedissonScheduledFuture<Void> f = new RedissonScheduledFuture<Void>(result, startTime){

            @Override
            public long getDelay(TimeUnit unit) {
                return unit.convert(startDate.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
        };
        this.storeReference(f, result.getRequestId());
        return f;
    }

    @Override
    public RScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) {
        RedissonScheduledFuture future = (RedissonScheduledFuture)this.scheduleWithFixedDelayAsync(task, initialDelay, delay, unit);
        RemotePromise rp = (RemotePromise)future.getInnerPromise();
        this.syncExecute(rp);
        return future;
    }

    @Override
    public RScheduledFuture<?> scheduleWithFixedDelayAsync(Runnable task, long initialDelay, long delay, TimeUnit unit) {
        this.check(task);
        ClassBody classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
        ScheduledWithFixedDelayParameters params = new ScheduledWithFixedDelayParameters();
        params.setClassName(classBody.getClazzName());
        params.setClassBody(classBody.getClazz());
        params.setLambdaBody(classBody.getLambda());
        params.setState(state);
        params.setStartTime(startTime);
        params.setDelay(unit.toMillis(delay));
        params.setExecutorId(this.executorId);
        RemotePromise result = (RemotePromise)this.asyncScheduledServiceAtFixed.scheduleWithFixedDelay(params);
        this.addListener(result);
        return this.createFuture(result, startTime);
    }

    @Override
    public boolean cancelTask(String taskId) {
        return this.commandExecutor.get(this.cancelTaskAsync(taskId));
    }

    @Override
    public RFuture<Boolean> cancelTaskAsync(String taskId) {
        if (taskId.startsWith("01")) {
            return this.scheduledRemoteService.cancelExecutionAsync(new RequestId(taskId));
        }
        return this.executorRemoteService.cancelExecutionAsync(new RequestId(taskId));
    }

    private <T> RFuture<T> poll(List<RExecutorFuture<?>> futures, long timeout, TimeUnit timeUnit) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference result = new AtomicReference();
        for (Future future : futures) {
            RFuture f = (RFuture)future;
            f.onComplete((r, e) -> {
                latch.countDown();
                result.compareAndSet(null, f);
            });
        }
        if (timeout == -1L) {
            latch.await();
        } else {
            latch.await(timeout, timeUnit);
        }
        return (RFuture)result.get();
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        try {
            return this.invokeAny(tasks, -1L, null);
        }
        catch (TimeoutException cannotHappen) {
            return null;
        }
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null) {
            throw new NullPointerException();
        }
        ArrayList futures = new ArrayList();
        for (Callable<T> callable : tasks) {
            Future future = this.submit((Callable)callable);
            futures.add((RExecutorFuture<?>)future);
        }
        RFuture<T> result = this.poll(futures, timeout, unit);
        if (result == null) {
            throw new TimeoutException();
        }
        for (RExecutorFuture rExecutorFuture : futures) {
            rExecutorFuture.cancel(true);
        }
        return result.getNow();
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        if (tasks == null) {
            throw new NullPointerException();
        }
        RExecutorBatchFuture future = this.submit(tasks.toArray(new Callable[tasks.size()]));
        future.await();
        List<Future<T>> futures = future.getTaskFutures();
        return futures;
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        if (tasks == null || unit == null) {
            throw new NullPointerException();
        }
        RExecutorBatchFuture future = this.submit(tasks.toArray(new Callable[tasks.size()]));
        future.await(timeout, unit);
        List<Future<T>> futures = future.getTaskFutures();
        return futures;
    }

    public static class ClassBody {
        private byte[] lambda;
        private byte[] clazz;
        private String clazzName;

        public ClassBody(byte[] lambda, byte[] clazz, String clazzName) {
            this.lambda = lambda;
            this.clazz = clazz;
            this.clazzName = clazzName;
        }

        public String getClazzName() {
            return this.clazzName;
        }

        public byte[] getClazz() {
            return this.clazz;
        }

        public byte[] getLambda() {
            return this.lambda;
        }
    }
}

