/*
 * Decompiled with CFR 0.152.
 */
package org.redisson;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.QueueTransferTask;
import org.redisson.Redisson;
import org.redisson.RedissonTopic;
import org.redisson.api.CronSchedule;
import org.redisson.api.RFuture;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RScheduledFuture;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RInject;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.executor.ExecutorRemoteService;
import org.redisson.executor.RedissonScheduledFuture;
import org.redisson.executor.RemoteExecutorService;
import org.redisson.executor.RemoteExecutorServiceAsync;
import org.redisson.executor.RemoteExecutorServiceImpl;
import org.redisson.executor.RemotePromise;
import org.redisson.executor.ScheduledExecutorRemoteService;
import org.redisson.misc.RPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedissonExecutorService
implements RScheduledExecutorService {
    private static final Logger log = LoggerFactory.getLogger(RedissonExecutorService.class);
    public static final int SHUTDOWN_STATE = 1;
    public static final int TERMINATED_STATE = 2;
    private final CommandExecutor commandExecutor;
    private final ConnectionManager connectionManager;
    private final Codec codec;
    private final Redisson redisson;
    private final String schedulerTasksName;
    private final String schedulerQueueName;
    private final String schedulerChannelName;
    private final String tasksCounterName;
    private final String statusName;
    private final RTopic<Integer> terminationTopic;
    private final RemoteExecutorServiceAsync asyncScheduledService;
    private final RemoteExecutorServiceAsync asyncScheduledServiceAtFixed;
    private final RemoteExecutorServiceAsync asyncService;
    private final RemoteExecutorServiceAsync asyncServiceWithoutResult;
    private final ScheduledExecutorRemoteService scheduledRemoteService;
    private final Map<Class<?>, byte[]> class2bytes = PlatformDependent.newConcurrentHashMap();
    private final String name;
    private final String requestQueueName;

    public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name) {
        this.codec = codec;
        this.commandExecutor = commandExecutor;
        this.connectionManager = commandExecutor.getConnectionManager();
        this.name = name;
        this.redisson = redisson;
        String objectName = this.requestQueueName = "{" + name + ":" + RemoteExecutorService.class.getName() + "}";
        this.tasksCounterName = objectName + ":counter";
        this.statusName = objectName + ":status";
        this.terminationTopic = redisson.getTopic(objectName + ":termination-topic", codec);
        this.schedulerChannelName = objectName + ":scheduler-channel";
        this.schedulerQueueName = objectName + ":scheduler";
        this.schedulerTasksName = objectName + ":scheduler-tasks";
        ExecutorRemoteService remoteService = new ExecutorRemoteService(codec, redisson, name, commandExecutor);
        remoteService.setTerminationTopicName(this.terminationTopic.getChannelNames().get(0));
        remoteService.setTasksCounterName(this.tasksCounterName);
        remoteService.setStatusName(this.statusName);
        this.asyncService = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1L, TimeUnit.DAYS));
        this.asyncServiceWithoutResult = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
        this.scheduledRemoteService = new ScheduledExecutorRemoteService(codec, redisson, name, commandExecutor);
        this.scheduledRemoteService.setTerminationTopicName(this.terminationTopic.getChannelNames().get(0));
        this.scheduledRemoteService.setTasksCounterName(this.tasksCounterName);
        this.scheduledRemoteService.setStatusName(this.statusName);
        this.scheduledRemoteService.setSchedulerQueueName(this.schedulerQueueName);
        this.scheduledRemoteService.setSchedulerChannelName(this.schedulerChannelName);
        this.scheduledRemoteService.setSchedulerTasksName(this.schedulerTasksName);
        this.asyncScheduledService = this.scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1L, TimeUnit.DAYS));
        this.asyncScheduledServiceAtFixed = this.scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
    }

    @Override
    public void registerWorkers(int workers) {
        this.registerWorkers(workers, this.commandExecutor.getConnectionManager().getExecutor());
    }

    @Override
    public void registerWorkers(int workers, ExecutorService executor) {
        QueueTransferTask scheduler = new QueueTransferTask(this.connectionManager){

            @Override
            protected RTopic<Long> getTopic() {
                return new RedissonTopic<Long>(LongCodec.INSTANCE, RedissonExecutorService.this.commandExecutor, RedissonExecutorService.this.schedulerChannelName);
            }

            @Override
            protected RFuture<Long> pushTaskAsync() {
                return RedissonExecutorService.this.commandExecutor.evalWriteAsync(RedissonExecutorService.this.name, (Codec)LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredTaskIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredTaskIds > 0 then redis.call('zrem', KEYS[2], unpack(expiredTaskIds));local expiredTasks = redis.call('hmget', KEYS[3], unpack(expiredTaskIds));redis.call('rpush', KEYS[1], unpack(expiredTasks));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;", Arrays.asList(RedissonExecutorService.this.requestQueueName, RedissonExecutorService.this.schedulerQueueName, RedissonExecutorService.this.schedulerTasksName), System.currentTimeMillis(), 100);
            }
        };
        scheduler.start();
        RemoteExecutorServiceImpl service = new RemoteExecutorServiceImpl(this.commandExecutor, this.redisson, this.codec, this.requestQueueName);
        service.setStatusName(this.statusName);
        service.setTasksCounterName(this.tasksCounterName);
        service.setTerminationTopicName(this.terminationTopic.getChannelNames().get(0));
        service.setSchedulerTasksName(this.schedulerTasksName);
        service.setSchedulerChannelName(this.schedulerChannelName);
        service.setSchedulerQueueName(this.schedulerQueueName);
        this.redisson.getRemoteService(this.name, this.codec).register(RemoteExecutorService.class, service, workers, executor);
    }

    @Override
    public void execute(Runnable task) {
        this.check(task);
        byte[] classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        RemotePromise promise = (RemotePromise)this.asyncServiceWithoutResult.executeRunnable(task.getClass().getName(), classBody, state);
        this.execute(promise);
    }

    private byte[] encode(Object task) {
        Field[] fields;
        for (Field field : fields = task.getClass().getDeclaredFields()) {
            if (!RedissonClient.class.isAssignableFrom(field.getType()) || !field.isAnnotationPresent(RInject.class)) continue;
            field.setAccessible(true);
            try {
                field.set(task, null);
            }
            catch (IllegalAccessException e) {
                throw new IllegalStateException(e);
            }
        }
        try {
            return this.codec.getValueEncoder().encode(task);
        }
        catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    private byte[] getClassBody(Object task) {
        Class<?> c = task.getClass();
        byte[] classBody = this.class2bytes.get(c);
        if (classBody == null) {
            String className = c.getName();
            String classAsPath = className.replace('.', '/') + ".class";
            InputStream classStream = c.getClassLoader().getResourceAsStream(classAsPath);
            DataInputStream s = new DataInputStream(classStream);
            try {
                classBody = new byte[s.available()];
                s.readFully(classBody);
            }
            catch (IOException e) {
                throw new IllegalArgumentException(e);
            }
            this.class2bytes.put(c, classBody);
        }
        return classBody;
    }

    @Override
    public void shutdown() {
        this.commandExecutor.evalWrite(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, "if redis.call('exists', KEYS[2]) == 0 then if redis.call('get', KEYS[1]) == '0' or redis.call('exists', KEYS[1]) == 0 then redis.call('set', KEYS[2], ARGV[2]);redis.call('publish', KEYS[3], ARGV[2]);else redis.call('set', KEYS[2], ARGV[1]);end;end;", Arrays.asList(this.tasksCounterName, this.statusName, this.terminationTopic.getChannelNames().get(0)), 1, 2);
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public boolean delete() {
        return this.commandExecutor.get(this.deleteAsync());
    }

    @Override
    public RFuture<Boolean> deleteAsync() {
        final RPromise<Boolean> result = this.connectionManager.newPromise();
        RFuture<Long> deleteFuture = this.redisson.getKeys().deleteAsync(this.requestQueueName, this.statusName, this.tasksCounterName, this.schedulerQueueName, this.schedulerTasksName);
        deleteFuture.addListener(new FutureListener<Long>(){

            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    result.tryFailure(future.cause());
                    return;
                }
                result.trySuccess(future.getNow() > 0L);
            }
        });
        return result;
    }

    @Override
    public List<Runnable> shutdownNow() {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean isShutdown() {
        return this.checkState(1);
    }

    private boolean checkState(int state) {
        return (Boolean)this.commandExecutor.evalWrite(this.getName(), this.codec, RedisCommands.EVAL_BOOLEAN, "if redis.call('exists', KEYS[1]) == 1 and tonumber(redis.call('get', KEYS[1])) >= tonumber(ARGV[1]) then return 1;end;return 0;", Arrays.asList(this.statusName), state);
    }

    @Override
    public boolean isTerminated() {
        return this.checkState(2);
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        if (this.isTerminated()) {
            return true;
        }
        final CountDownLatch latch = new CountDownLatch(1);
        MessageListener<Integer> listener = new MessageListener<Integer>(){

            @Override
            public void onMessage(String channel, Integer msg) {
                if (msg == 2) {
                    latch.countDown();
                }
            }
        };
        int listenerId = this.terminationTopic.addListener(listener);
        if (this.isTerminated()) {
            this.terminationTopic.removeListener(listenerId);
            return true;
        }
        boolean res = latch.await(timeout, unit);
        this.terminationTopic.removeListener(listenerId);
        return res;
    }

    @Override
    public <T> RFuture<T> submit(Callable<T> task) {
        RemotePromise promise = (RemotePromise)this.submitAsync(task);
        this.execute(promise);
        return promise;
    }

    @Override
    public <T> RFuture<T> submitAsync(Callable<T> task) {
        this.check(task);
        byte[] classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        RemotePromise result = (RemotePromise)this.asyncService.executeCallable(task.getClass().getName(), classBody, state);
        this.addListener(result);
        return result;
    }

    private <T> void addListener(final RemotePromise<T> result) {
        result.getAddFuture().addListener(new FutureListener<Boolean>(){

            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!future.isSuccess()) {
                    result.tryFailure(future.cause());
                    return;
                }
                if (!future.getNow().booleanValue()) {
                    result.tryFailure(new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state"));
                }
            }
        });
    }

    private void check(Object task) {
        if (task == null) {
            throw new NullPointerException("Task is not defined");
        }
        if (task.getClass().isAnonymousClass()) {
            throw new IllegalArgumentException("Task can't be created using anonymous class");
        }
        if (task.getClass().isMemberClass() && !Modifier.isStatic(task.getClass().getModifiers())) {
            throw new IllegalArgumentException("Task class is an inner class and it should be static");
        }
    }

    private <T> void execute(RemotePromise<T> promise) {
        RFuture<Boolean> addFuture = promise.getAddFuture();
        addFuture.syncUninterruptibly();
        Boolean res = addFuture.getNow();
        if (!res.booleanValue()) {
            throw new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state");
        }
    }

    @Override
    public <T> RFuture<T> submit(Runnable task, final T result) {
        final RPromise resultFuture = this.connectionManager.newPromise();
        java.util.concurrent.Future future = this.submit(task);
        future.addListener(new FutureListener<Object>(){

            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (!future.isSuccess()) {
                    resultFuture.tryFailure(future.cause());
                    return;
                }
                resultFuture.trySuccess(result);
            }
        });
        return resultFuture;
    }

    @Override
    public RFuture<?> submit(Runnable task) {
        RemotePromise promise = (RemotePromise)this.submitAsync(task);
        this.execute(promise);
        return promise;
    }

    @Override
    public RFuture<?> submitAsync(Runnable task) {
        this.check(task);
        byte[] classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        RemotePromise result = (RemotePromise)this.asyncService.executeRunnable(task.getClass().getName(), classBody, state);
        this.addListener(result);
        return result;
    }

    @Override
    public RScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) {
        RedissonScheduledFuture future = (RedissonScheduledFuture)this.scheduleAsync(task, delay, unit);
        this.execute((RemotePromise)future.getInnerPromise());
        return future;
    }

    @Override
    public RScheduledFuture<?> scheduleAsync(Runnable task, long delay, TimeUnit unit) {
        this.check(task);
        byte[] classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        long startTime = System.currentTimeMillis() + unit.toMillis(delay);
        RemotePromise result = (RemotePromise)this.asyncScheduledService.scheduleRunnable(task.getClass().getName(), classBody, state, startTime);
        this.addListener(result);
        return new RedissonScheduledFuture(result, startTime);
    }

    @Override
    public <V> RScheduledFuture<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
        RedissonScheduledFuture future = (RedissonScheduledFuture)this.scheduleAsync(task, delay, unit);
        this.execute((RemotePromise)future.getInnerPromise());
        return future;
    }

    @Override
    public <V> RScheduledFuture<V> scheduleAsync(Callable<V> task, long delay, TimeUnit unit) {
        this.check(task);
        byte[] classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        long startTime = System.currentTimeMillis() + unit.toMillis(delay);
        RemotePromise result = (RemotePromise)this.asyncScheduledService.scheduleCallable(task.getClass().getName(), classBody, state, startTime);
        this.addListener(result);
        return new RedissonScheduledFuture(result, startTime);
    }

    @Override
    public RScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
        RedissonScheduledFuture future = (RedissonScheduledFuture)this.scheduleAtFixedRateAsync(task, initialDelay, period, unit);
        this.execute((RemotePromise)future.getInnerPromise());
        return future;
    }

    @Override
    public RScheduledFuture<?> scheduleAtFixedRateAsync(Runnable task, long initialDelay, long period, TimeUnit unit) {
        this.check(task);
        byte[] classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
        RemotePromise result = (RemotePromise)this.asyncScheduledServiceAtFixed.scheduleAtFixedRate(task.getClass().getName(), classBody, state, startTime, unit.toMillis(period));
        this.addListener(result);
        return new RedissonScheduledFuture(result, startTime);
    }

    public RScheduledFuture<?> schedule(Runnable task, CronSchedule cronSchedule) {
        RedissonScheduledFuture future = (RedissonScheduledFuture)this.scheduleAsync(task, cronSchedule);
        this.execute((RemotePromise)future.getInnerPromise());
        return future;
    }

    @Override
    public RScheduledFuture<?> scheduleAsync(Runnable task, CronSchedule cronSchedule) {
        this.check(task);
        byte[] classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        final Date startDate = cronSchedule.getExpression().getNextValidTimeAfter(new Date());
        long startTime = startDate.getTime();
        RemotePromise result = (RemotePromise)this.asyncScheduledServiceAtFixed.schedule(task.getClass().getName(), classBody, state, startTime, cronSchedule.getExpression().getCronExpression());
        this.addListener(result);
        return new RedissonScheduledFuture<Void>(result, startTime){

            @Override
            public long getDelay(TimeUnit unit) {
                return unit.convert(startDate.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
        };
    }

    @Override
    public RScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) {
        RedissonScheduledFuture future = (RedissonScheduledFuture)this.scheduleWithFixedDelayAsync(task, initialDelay, delay, unit);
        this.execute((RemotePromise)future.getInnerPromise());
        return future;
    }

    @Override
    public RScheduledFuture<?> scheduleWithFixedDelayAsync(Runnable task, long initialDelay, long delay, TimeUnit unit) {
        this.check(task);
        byte[] classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
        RemotePromise result = (RemotePromise)this.asyncScheduledServiceAtFixed.scheduleWithFixedDelay(task.getClass().getName(), classBody, state, startTime, unit.toMillis(delay));
        this.addListener(result);
        return new RedissonScheduledFuture(result, startTime);
    }

    @Override
    public boolean cancelScheduledTask(String taskId) {
        return this.scheduledRemoteService.cancelExecution(taskId);
    }

    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long millis) throws InterruptedException, ExecutionException, TimeoutException {
        ExecutionException ee;
        ArrayList<java.util.concurrent.Future<T>> futures;
        block17: {
            if (tasks == null) {
                throw new NullPointerException();
            }
            int ntasks = tasks.size();
            if (ntasks == 0) {
                throw new IllegalArgumentException();
            }
            futures = new ArrayList<java.util.concurrent.Future<T>>(ntasks);
            ee = null;
            long lastTime = timed ? System.currentTimeMillis() : 0L;
            Iterator<Callable<T>> it = tasks.iterator();
            futures.add(this.submit((Callable)it.next()));
            --ntasks;
            int active = 1;
            while (true) {
                T now22;
                java.util.concurrent.Future<T> f;
                if ((f = this.poll(futures)) == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(this.submit((Callable)it.next()));
                        ++active;
                    } else {
                        if (active == 0) break;
                        if (timed) {
                            f = this.poll(futures, millis, TimeUnit.MILLISECONDS);
                            if (f == null) {
                                throw new TimeoutException();
                            }
                            long now22 = System.currentTimeMillis();
                            millis -= now22 - lastTime;
                            lastTime = now22;
                        } else {
                            f = this.poll(futures, -1L, null);
                        }
                    }
                }
                if (f == null) continue;
                --active;
                try {
                    now22 = f.get();
                }
                catch (ExecutionException eex) {
                    ee = eex;
                    continue;
                }
                catch (RuntimeException rex) {
                    ee = new ExecutionException(rex);
                    continue;
                }
                return now22;
                break;
            }
            if (ee != null) break block17;
            ee = new ExecutionException("No tasks were finised", null);
        }
        throw ee;
        finally {
            for (java.util.concurrent.Future future : futures) {
                future.cancel(true);
            }
        }
    }

    private <T> java.util.concurrent.Future<T> poll(List<java.util.concurrent.Future<T>> futures, long timeout, TimeUnit timeUnit) throws InterruptedException {
        RFuture f;
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicReference result = new AtomicReference();
        FutureListener listener = new FutureListener<T>(){

            @Override
            public void operationComplete(Future<T> future) throws Exception {
                latch.countDown();
                result.compareAndSet(null, future);
            }
        };
        for (java.util.concurrent.Future<T> future : futures) {
            f = (RFuture)future;
            f.addListener(listener);
        }
        if (timeout == -1L) {
            latch.await();
        } else {
            latch.await(timeout, timeUnit);
        }
        for (java.util.concurrent.Future<T> future : futures) {
            f = (RFuture)future;
            f.removeListener(listener);
        }
        return (java.util.concurrent.Future)result.get();
    }

    private <T> java.util.concurrent.Future<T> poll(List<java.util.concurrent.Future<T>> futures) {
        for (java.util.concurrent.Future<T> future : futures) {
            if (!future.isDone()) continue;
            return future;
        }
        return null;
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        try {
            return this.doInvokeAny(tasks, false, 0L);
        }
        catch (TimeoutException cannotHappen) {
            return null;
        }
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return this.doInvokeAny(tasks, true, unit.toMillis(timeout));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        if (tasks == null) {
            throw new NullPointerException();
        }
        ArrayList<java.util.concurrent.Future> futures = new ArrayList<java.util.concurrent.Future>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) {
                java.util.concurrent.Future future = this.submit((Callable)t);
                futures.add(future);
            }
            for (java.util.concurrent.Future f : futures) {
                if (f.isDone()) continue;
                try {
                    f.get();
                }
                catch (CancellationException future) {
                }
                catch (ExecutionException future) {}
            }
            done = true;
            ArrayList<java.util.concurrent.Future> arrayList = futures;
            return arrayList;
        }
        finally {
            if (!done) {
                for (java.util.concurrent.Future f : futures) {
                    f.cancel(true);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        if (tasks == null || unit == null) {
            throw new NullPointerException();
        }
        long millis = unit.toMillis(timeout);
        ArrayList<java.util.concurrent.Future<T>> futures = new ArrayList<java.util.concurrent.Future<T>>(tasks.size());
        boolean done = false;
        try {
            long lastTime = System.currentTimeMillis();
            for (Callable<T> callable : tasks) {
                java.util.concurrent.Future future2 = this.submit((Callable)callable);
                futures.add(future2);
                long now = System.currentTimeMillis();
                lastTime = now;
                if ((millis -= now - lastTime) > 0L) continue;
                int n = tasks.size() - futures.size();
                for (int i = 0; i < n; ++i) {
                    RPromise cancelledFuture = this.connectionManager.newPromise();
                    cancelledFuture.cancel(true);
                    futures.add(cancelledFuture);
                }
                ArrayList<java.util.concurrent.Future<T>> arrayList = futures;
                return arrayList;
            }
            for (java.util.concurrent.Future future : futures) {
                if (future.isDone()) continue;
                if (millis <= 0L) {
                    ArrayList<java.util.concurrent.Future<T>> future2 = futures;
                    return future2;
                }
                try {
                    future.get(millis, TimeUnit.MILLISECONDS);
                }
                catch (CancellationException future2) {
                }
                catch (ExecutionException future2) {
                }
                catch (TimeoutException toe) {
                    ArrayList<java.util.concurrent.Future<T>> arrayList = futures;
                    if (!done) {
                        for (java.util.concurrent.Future future3 : futures) {
                            future3.cancel(true);
                        }
                    }
                    return arrayList;
                }
                long now = System.currentTimeMillis();
                millis -= now - lastTime;
                lastTime = now;
            }
            done = true;
            ArrayList<java.util.concurrent.Future<T>> arrayList = futures;
            return arrayList;
        }
        finally {
            if (!done) {
                for (java.util.concurrent.Future future : futures) {
                    future.cancel(true);
                }
            }
        }
    }
}

