/*
 * Decompiled with CFR 0.152.
 */
package org.redisson;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.ThreadLocalRandom;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RRemoteAsync;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RRemoteServiceResponse;
import org.redisson.remote.RemoteServiceAck;
import org.redisson.remote.RemoteServiceAckTimeoutException;
import org.redisson.remote.RemoteServiceCancelRequest;
import org.redisson.remote.RemoteServiceCancelResponse;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RemoteServiceResponse;
import org.redisson.remote.RemoteServiceTimeoutException;
import org.redisson.remote.ResponseEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseRemoteService {
    private static final Logger log = LoggerFactory.getLogger(BaseRemoteService.class);
    protected final Codec codec;
    protected final RedissonClient redisson;
    protected final String name;
    protected final CommandAsyncExecutor commandExecutor;
    protected final String executorId;
    private final ConcurrentMap<String, ResponseEntry> responses;

    public BaseRemoteService(Codec codec, RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
        this.codec = codec;
        this.redisson = redisson;
        this.name = name;
        this.commandExecutor = commandExecutor;
        this.executorId = executorId;
        this.responses = responses;
    }

    protected String getCancelRequestMapName(Class<?> remoteInterface) {
        return "{" + this.name + ":remote}:cancel-request";
    }

    protected String getCancelResponseMapName(Class<?> remoteInterface) {
        return "{" + this.name + ":remote}:cancel-response";
    }

    protected String getAckName(Class<?> remoteInterface, String requestId) {
        return "{" + this.name + ":remote}:" + requestId + ":ack";
    }

    protected String getResponseQueueName(Class<?> remoteInterface, String executorId) {
        return "{remote_response}:" + executorId;
    }

    protected String getRequestQueueName(Class<?> remoteInterface) {
        return "{" + this.name + ":" + remoteInterface.getName() + "}";
    }

    protected ByteBuf encode(Object obj) {
        try {
            return this.codec.getValueEncoder().encode(obj);
        }
        catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    public <T> T get(Class<T> remoteInterface) {
        return this.get(remoteInterface, RemoteInvocationOptions.defaults());
    }

    public <T> T get(Class<T> remoteInterface, long executionTimeout, TimeUnit executionTimeUnit) {
        return this.get(remoteInterface, RemoteInvocationOptions.defaults().expectResultWithin(executionTimeout, executionTimeUnit));
    }

    public <T> T get(Class<T> remoteInterface, long executionTimeout, TimeUnit executionTimeUnit, long ackTimeout, TimeUnit ackTimeUnit) {
        return this.get(remoteInterface, RemoteInvocationOptions.defaults().expectAckWithin(ackTimeout, ackTimeUnit).expectResultWithin(executionTimeout, executionTimeUnit));
    }

    public <T> T get(Class<T> remoteInterface, RemoteInvocationOptions options) {
        for (Annotation annotation : remoteInterface.getAnnotations()) {
            if (annotation.annotationType() != RRemoteAsync.class) continue;
            Class<?> syncInterface = ((RRemoteAsync)annotation).value();
            for (Method m : remoteInterface.getMethods()) {
                try {
                    syncInterface.getMethod(m.getName(), m.getParameterTypes());
                }
                catch (NoSuchMethodException e) {
                    throw new IllegalArgumentException("Method '" + m.getName() + "' with params '" + Arrays.toString(m.getParameterTypes()) + "' isn't defined in " + syncInterface);
                }
                catch (SecurityException e) {
                    throw new IllegalArgumentException(e);
                }
                if (m.getReturnType().getClass().isInstance(RFuture.class)) continue;
                throw new IllegalArgumentException(m.getReturnType().getClass() + " isn't allowed as return type");
            }
            return this.async(remoteInterface, options, syncInterface);
        }
        return this.sync(remoteInterface, options);
    }

    private <T> T async(Class<T> remoteInterface, RemoteInvocationOptions options, final Class<?> syncInterface) {
        final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options);
        final String toString = this.getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + this.generateRequestId();
        InvocationHandler handler = new InvocationHandler(){

            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                if (method.getName().equals("toString")) {
                    return toString;
                }
                if (method.getName().equals("equals")) {
                    return proxy == args[0];
                }
                if (method.getName().equals("hashCode")) {
                    return toString.hashCode();
                }
                if (!(optionsCopy.isResultExpected() || method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE) || method.getReturnType().equals(RFuture.class))) {
                    throw new IllegalArgumentException("The noResult option only supports void return value");
                }
                final String requestId = BaseRemoteService.this.generateRequestId();
                final String requestQueueName = BaseRemoteService.this.getRequestQueueName(syncInterface);
                final String responseName = BaseRemoteService.this.getResponseQueueName(syncInterface, BaseRemoteService.this.executorId);
                final String ackName = BaseRemoteService.this.getAckName(syncInterface, requestId);
                final RBlockingQueue<RemoteServiceRequest> requestQueue = BaseRemoteService.this.redisson.getBlockingQueue(requestQueueName, BaseRemoteService.this.codec);
                final RemoteServiceRequest request = new RemoteServiceRequest(BaseRemoteService.this.executorId, requestId, method.getName(), BaseRemoteService.this.getMethodSignatures(method), args, optionsCopy, System.currentTimeMillis());
                final RemotePromise<Object> result = new RemotePromise<Object>(requestId){

                    @Override
                    public boolean cancel(boolean mayInterruptIfRunning) {
                        if (this.isCancelled()) {
                            return true;
                        }
                        if (this.isDone()) {
                            return false;
                        }
                        if (optionsCopy.isAckExpected()) {
                            RFuture future = BaseRemoteService.this.commandExecutor.evalWriteAsync(responseName, (Codec)LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('setnx', KEYS[1], 1) == 1 then redis.call('pexpire', KEYS[1], ARGV[2]);redis.call('lrem', KEYS[3], 1, ARGV[1]);return 1;end;return 0;", Arrays.asList(ackName, responseName, requestQueueName), BaseRemoteService.this.encode(request), request.getOptions().getAckTimeoutInMillis());
                            boolean ackNotSent = (Boolean)BaseRemoteService.this.commandExecutor.get(future);
                            if (ackNotSent) {
                                super.cancel(mayInterruptIfRunning);
                                return true;
                            }
                            return this.cancel(syncInterface, requestId, request, mayInterruptIfRunning);
                        }
                        boolean removed = BaseRemoteService.this.commandExecutor.get(BaseRemoteService.this.removeAsync(requestQueue, request));
                        if (removed) {
                            super.cancel(mayInterruptIfRunning);
                            return true;
                        }
                        return this.cancel(syncInterface, requestId, request, mayInterruptIfRunning);
                    }

                    private boolean cancel(Class<?> remoteInterface, String requestId2, RemoteServiceRequest request2, boolean mayInterruptIfRunning) {
                        if (this.isCancelled()) {
                            return true;
                        }
                        if (this.isDone()) {
                            return false;
                        }
                        String canceRequestName = BaseRemoteService.this.getCancelRequestMapName(remoteInterface);
                        BaseRemoteService.this.cancelExecution(optionsCopy, responseName, request2, mayInterruptIfRunning, canceRequestName, this);
                        try {
                            this.awaitUninterruptibly(60L, TimeUnit.SECONDS);
                        }
                        catch (CancellationException cancellationException) {
                            // empty catch block
                        }
                        return this.isCancelled();
                    }
                };
                RFuture<Boolean> addFuture = BaseRemoteService.this.addAsync(requestQueue, request, result);
                addFuture.addListener(new FutureListener<Boolean>(){

                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        if (!future.isSuccess()) {
                            result.tryFailure(future.cause());
                            return;
                        }
                        if (optionsCopy.isAckExpected()) {
                            RPromise ackFuture = BaseRemoteService.this.poll(optionsCopy.getAckTimeoutInMillis(), request.getId(), responseName);
                            ackFuture.addListener(new FutureListener<RemoteServiceAck>(){

                                @Override
                                public void operationComplete(Future<RemoteServiceAck> future) throws Exception {
                                    if (!future.isSuccess()) {
                                        result.tryFailure(future.cause());
                                        return;
                                    }
                                    RemoteServiceAck ack = future.getNow();
                                    if (ack == null) {
                                        RedissonPromise ackFuture = new RedissonPromise();
                                        ((ResponseEntry)BaseRemoteService.this.responses.get(BaseRemoteService.this.executorId)).getResponses().put(request.getId(), ackFuture);
                                        RFuture ackFutureAttempt = BaseRemoteService.this.tryPollAckAgainAsync(optionsCopy, ackFuture, ackName);
                                        ackFutureAttempt.addListener(new FutureListener<RemoteServiceAck>(){

                                            @Override
                                            public void operationComplete(Future<RemoteServiceAck> future) throws Exception {
                                                if (!future.isSuccess()) {
                                                    result.tryFailure(future.cause());
                                                    return;
                                                }
                                                if (future.getNow() == null) {
                                                    RemoteServiceAckTimeoutException ex = new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request);
                                                    result.tryFailure(ex);
                                                    return;
                                                }
                                                BaseRemoteService.this.awaitResultAsync(optionsCopy, result, request, responseName, ackName);
                                            }
                                        });
                                    } else {
                                        BaseRemoteService.this.awaitResultAsync(optionsCopy, result, request, responseName);
                                    }
                                }
                            });
                        } else {
                            BaseRemoteService.this.awaitResultAsync(optionsCopy, result, request, responseName);
                        }
                    }
                });
                return result;
            }
        };
        return (T)Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler);
    }

    private void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result, final RemoteServiceRequest request, final String responseName, String ackName) {
        RFuture<Boolean> deleteFuture = this.redisson.getBucket(ackName).deleteAsync();
        deleteFuture.addListener(new FutureListener<Boolean>(){

            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!future.isSuccess()) {
                    result.tryFailure(future.cause());
                    return;
                }
                BaseRemoteService.this.awaitResultAsync(optionsCopy, result, request, responseName);
            }
        });
    }

    protected void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result, final RemoteServiceRequest request, String responseName) {
        if (!optionsCopy.isResultExpected()) {
            return;
        }
        RPromise responseFuture = this.poll(optionsCopy.getExecutionTimeoutInMillis(), request.getId(), responseName);
        responseFuture.addListener(new FutureListener<RRemoteServiceResponse>(){

            @Override
            public void operationComplete(Future<RRemoteServiceResponse> future) throws Exception {
                if (!future.isSuccess()) {
                    result.tryFailure(future.cause());
                    return;
                }
                if (future.getNow() == null) {
                    RemoteServiceTimeoutException e = new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request);
                    result.tryFailure(e);
                    return;
                }
                if (future.getNow() instanceof RemoteServiceCancelResponse) {
                    result.doCancel();
                    return;
                }
                RemoteServiceResponse response = (RemoteServiceResponse)future.getNow();
                if (response.getError() != null) {
                    result.tryFailure(response.getError());
                    return;
                }
                result.trySuccess(response.getResult());
            }
        });
    }

    private <T extends RRemoteServiceResponse> RPromise<T> poll(long timeout, final String requestId, String responseName) {
        ResponseEntry oldEntry;
        final RedissonPromise responseFuture = new RedissonPromise();
        ResponseEntry entry = (ResponseEntry)this.responses.get(responseName);
        if (entry == null && (oldEntry = this.responses.putIfAbsent(responseName, entry = new ResponseEntry())) != null) {
            entry = oldEntry;
        }
        final ConcurrentMap<String, RPromise<? extends RRemoteServiceResponse>> responses = entry.getResponses();
        this.commandExecutor.getConnectionManager().getGroup().schedule(new Runnable(){

            @Override
            public void run() {
                if (!responseFuture.isDone() && responses.remove(requestId, responseFuture)) {
                    responseFuture.trySuccess(null);
                }
            }
        }, timeout, TimeUnit.MILLISECONDS);
        responses.put(requestId, responseFuture);
        this.pollTasks(entry, responseName);
        return responseFuture;
    }

    private void pollTasks(final ResponseEntry entry, final String responseName) {
        if (!entry.getStarted().compareAndSet(false, true)) {
            return;
        }
        final RBlockingQueue responseQueue = this.redisson.getBlockingQueue(responseName, this.codec);
        RFuture<RRemoteServiceResponse> future = responseQueue.takeAsync();
        future.addListener(new FutureListener<RRemoteServiceResponse>(){

            @Override
            public void operationComplete(Future<RRemoteServiceResponse> future) throws Exception {
                if (!future.isSuccess()) {
                    log.error("Can't get response from " + responseName, future.cause());
                    return;
                }
                RRemoteServiceResponse response = future.getNow();
                RPromise promise = (RPromise)entry.getResponses().remove(response.getId());
                if (promise != null) {
                    promise.trySuccess(response);
                    if (!entry.getResponses().isEmpty()) {
                        responseQueue.takeAsync().addListener(this);
                    } else {
                        entry.getStarted().set(false);
                        if (!entry.getResponses().isEmpty()) {
                            BaseRemoteService.this.pollTasks(entry, responseName);
                        }
                    }
                }
            }
        });
    }

    private <T> T sync(final Class<T> remoteInterface, RemoteInvocationOptions options) {
        final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options);
        final String toString = this.getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + this.generateRequestId();
        InvocationHandler handler = new InvocationHandler(){

            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                if (method.getName().equals("toString")) {
                    return toString;
                }
                if (method.getName().equals("equals")) {
                    return proxy == args[0];
                }
                if (method.getName().equals("hashCode")) {
                    return toString.hashCode();
                }
                if (!(optionsCopy.isResultExpected() || method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE))) {
                    throw new IllegalArgumentException("The noResult option only supports void return value");
                }
                String requestId = BaseRemoteService.this.generateRequestId();
                String requestQueueName = BaseRemoteService.this.getRequestQueueName(remoteInterface);
                RBlockingQueue requestQueue = BaseRemoteService.this.redisson.getBlockingQueue(requestQueueName, BaseRemoteService.this.codec);
                RemoteServiceRequest request = new RemoteServiceRequest(BaseRemoteService.this.executorId, requestId, method.getName(), BaseRemoteService.this.getMethodSignatures(method), args, optionsCopy, System.currentTimeMillis());
                requestQueue.add(request);
                RBlockingQueue responseQueue = null;
                if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) {
                    String responseName = BaseRemoteService.this.getResponseQueueName(remoteInterface, BaseRemoteService.this.executorId);
                    responseQueue = BaseRemoteService.this.redisson.getBlockingQueue(responseName, BaseRemoteService.this.codec);
                }
                if (optionsCopy.isAckExpected()) {
                    String ackName = BaseRemoteService.this.getAckName(remoteInterface, requestId);
                    RemoteServiceAck ack = (RemoteServiceAck)responseQueue.poll(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
                    if (ack == null && (ack = BaseRemoteService.this.tryPollAckAgain(optionsCopy, responseQueue, ackName)) == null) {
                        throw new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request);
                    }
                    BaseRemoteService.this.redisson.getBucket(ackName).delete();
                }
                if (optionsCopy.isResultExpected()) {
                    RemoteServiceResponse response = (RemoteServiceResponse)responseQueue.poll(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS);
                    if (response == null) {
                        throw new RemoteServiceTimeoutException("No response1 after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request);
                    }
                    if (response.getError() != null) {
                        throw response.getError();
                    }
                    return response.getResult();
                }
                return null;
            }
        };
        return (T)Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler);
    }

    private RemoteServiceAck tryPollAckAgain(RemoteInvocationOptions optionsCopy, RBlockingQueue<? extends RRemoteServiceResponse> responseQueue, String ackName) throws InterruptedException {
        RFuture ackClientsFuture = this.commandExecutor.evalWriteAsync(ackName, (Codec)LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('setnx', KEYS[1], 1) == 1 then redis.call('pexpire', KEYS[1], ARGV[1]);return 0;end;redis.call('del', KEYS[1]);return 1;", Arrays.asList(ackName), optionsCopy.getAckTimeoutInMillis());
        ackClientsFuture.sync();
        if (((Boolean)ackClientsFuture.getNow()).booleanValue()) {
            return (RemoteServiceAck)responseQueue.poll();
        }
        return null;
    }

    private RFuture<RemoteServiceAck> tryPollAckAgainAsync(RemoteInvocationOptions optionsCopy, final RPromise<RemoteServiceAck> pollFuture, String ackName) throws InterruptedException {
        final RedissonPromise<RemoteServiceAck> promise = new RedissonPromise<RemoteServiceAck>();
        RFuture ackClientsFuture = this.commandExecutor.evalWriteAsync(ackName, (Codec)LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('setnx', KEYS[1], 1) == 1 then redis.call('pexpire', KEYS[1], ARGV[1]);return 0;end;redis.call('del', KEYS[1]);return 1;", Arrays.asList(ackName), optionsCopy.getAckTimeoutInMillis());
        ackClientsFuture.addListener(new FutureListener<Boolean>(){

            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!future.isSuccess()) {
                    promise.tryFailure(future.cause());
                    return;
                }
                if (future.getNow().booleanValue()) {
                    pollFuture.addListener(new FutureListener<RemoteServiceAck>(){

                        @Override
                        public void operationComplete(Future<RemoteServiceAck> future) throws Exception {
                            if (!future.isSuccess()) {
                                promise.tryFailure(future.cause());
                                return;
                            }
                            promise.trySuccess(future.getNow());
                        }
                    });
                } else {
                    promise.trySuccess(null);
                }
            }
        });
        return promise;
    }

    protected <T> void scheduleCheck(final String mapName, final String requestId, final RPromise<T> cancelRequest) {
        this.commandExecutor.getConnectionManager().newTimeout(new TimerTask(){

            @Override
            public void run(Timeout timeout) throws Exception {
                if (cancelRequest.isDone()) {
                    return;
                }
                RMap canceledRequests = BaseRemoteService.this.redisson.getMap(mapName, BaseRemoteService.this.codec);
                RFuture future = canceledRequests.getAsync(requestId);
                future.addListener(new FutureListener<T>(){

                    @Override
                    public void operationComplete(Future<T> future) throws Exception {
                        if (cancelRequest.isDone()) {
                            return;
                        }
                        if (!future.isSuccess()) {
                            BaseRemoteService.this.scheduleCheck(mapName, requestId, cancelRequest);
                            return;
                        }
                        Object request = future.getNow();
                        if (request == null) {
                            BaseRemoteService.this.scheduleCheck(mapName, requestId, cancelRequest);
                        } else {
                            cancelRequest.trySuccess(request);
                        }
                    }
                });
            }
        }, 3000L, TimeUnit.MILLISECONDS);
    }

    protected String generateRequestId() {
        byte[] id = new byte[16];
        ThreadLocalRandom.current().nextBytes(id);
        return ByteBufUtil.hexDump(id);
    }

    protected RFuture<Boolean> addAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request, RemotePromise<Object> result) {
        RFuture<Boolean> future = requestQueue.addAsync(request);
        result.setAddFuture(future);
        return future;
    }

    protected RFuture<Boolean> removeAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request) {
        return requestQueue.removeAsync(request);
    }

    private void cancelExecution(RemoteInvocationOptions optionsCopy, String responseName, RemoteServiceRequest request, boolean mayInterruptIfRunning, String canceRequestName, RemotePromise<Object> remotePromise) {
        RMap<String, RemoteServiceCancelRequest> canceledRequests = this.redisson.getMap(canceRequestName, this.codec);
        canceledRequests.putAsync(request.getId(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false));
        canceledRequests.expireAsync(60L, TimeUnit.SECONDS);
        if (!optionsCopy.isResultExpected()) {
            RemoteInvocationOptions options = new RemoteInvocationOptions(optionsCopy);
            options.expectResultWithin(60L, TimeUnit.SECONDS);
            this.awaitResultAsync(options, remotePromise, request, responseName);
        }
    }

    protected List<String> getMethodSignatures(Method method) {
        ArrayList<String> list = new ArrayList<String>(method.getParameterTypes().length);
        for (Class<?> t : method.getParameterTypes()) {
            list.add(t.getName());
        }
        return list;
    }
}

