/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.primitive.proxy.impl;

import com.google.common.base.Defaults;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import io.atomix.cluster.MemberId;
import io.atomix.primitive.PrimitiveException;
import io.atomix.primitive.PrimitiveId;
import io.atomix.primitive.PrimitiveState;
import io.atomix.primitive.PrimitiveType;
import io.atomix.primitive.event.EventType;
import io.atomix.primitive.event.Events;
import io.atomix.primitive.event.PrimitiveEvent;
import io.atomix.primitive.log.LogRecord;
import io.atomix.primitive.log.LogSession;
import io.atomix.primitive.operation.OperationId;
import io.atomix.primitive.operation.OperationType;
import io.atomix.primitive.operation.Operations;
import io.atomix.primitive.operation.impl.DefaultOperationId;
import io.atomix.primitive.partition.PartitionId;
import io.atomix.primitive.proxy.ProxySession;
import io.atomix.primitive.proxy.impl.LogOperation;
import io.atomix.primitive.service.PrimitiveService;
import io.atomix.primitive.service.ServiceConfig;
import io.atomix.primitive.service.ServiceContext;
import io.atomix.primitive.service.impl.DefaultCommit;
import io.atomix.primitive.session.Session;
import io.atomix.primitive.session.SessionId;
import io.atomix.primitive.session.impl.AbstractSession;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.serializer.Namespace;
import io.atomix.utils.serializer.Namespaces;
import io.atomix.utils.serializer.Serializer;
import io.atomix.utils.time.LogicalClock;
import io.atomix.utils.time.LogicalTimestamp;
import io.atomix.utils.time.WallClock;
import io.atomix.utils.time.WallClockTimestamp;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogProxySession<S>
implements ProxySession<S> {
    private static final Serializer INTERNAL_SERIALIZER = Serializer.using((Namespace)Namespace.builder().register(Namespaces.BASIC).nextId(500).register(new Class[]{LogOperation.class}).register(new Class[]{DefaultOperationId.class}).register(new Class[]{OperationType.class}).register(new Class[]{SessionId.class}).build());
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final String name;
    private final PrimitiveType type;
    private final PrimitiveService service;
    private final ServiceConfig serviceConfig;
    private final ServiceContext context = new LogServiceContext();
    private final Serializer userSerializer;
    private final LogSession session;
    private final ServiceProxy<S> proxy;
    private volatile Object client;
    private volatile CompletableFuture<ProxySession<S>> connectFuture;
    private final AtomicLong operationIndex = new AtomicLong();
    private final Map<EventType, Method> eventMethods = Maps.newConcurrentMap();
    private final Map<Long, CompletableFuture> writeFutures = Maps.newConcurrentMap();
    private final Queue<PendingRead> pendingReads = new LinkedList<PendingRead>();
    private final Map<SessionId, Session> sessions = Maps.newConcurrentMap();
    private long lastIndex;
    private long currentIndex;
    private Session currentSession;
    private OperationType currentOperation;
    private long currentTimestamp;

    public LogProxySession(String name, PrimitiveType type, Class<S> serviceType, ServiceConfig serviceConfig, Serializer serializer, LogSession session) {
        this.name = (String)Preconditions.checkNotNull((Object)name, (Object)"name cannot be null");
        this.type = (PrimitiveType)Preconditions.checkNotNull((Object)type, (Object)"type cannot be null");
        this.service = type.newService(serviceConfig);
        this.serviceConfig = serviceConfig;
        this.userSerializer = (Serializer)Preconditions.checkNotNull((Object)serializer, (Object)"serializer cannot be null");
        this.session = (LogSession)Preconditions.checkNotNull((Object)session, (Object)"session cannot be null");
        ServiceProxyHandler serviceProxyHandler = new ServiceProxyHandler(serviceType);
        Object serviceProxy = Proxy.newProxyInstance(serviceType.getClassLoader(), new Class[]{serviceType}, (InvocationHandler)serviceProxyHandler);
        this.proxy = new ServiceProxy<Object>(serviceProxy, serviceProxyHandler);
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public PrimitiveType type() {
        return this.type;
    }

    @Override
    public PartitionId partitionId() {
        return this.session.partitionId();
    }

    @Override
    public ThreadContext context() {
        return this.session.context();
    }

    @Override
    public PrimitiveState getState() {
        return this.session.getState();
    }

    @Override
    public void register(Object client) {
        this.client = client;
        Events.getEventMap(client.getClass()).forEach((eventType, method) -> this.eventMethods.put((EventType)eventType, (Method)method));
    }

    @Override
    public CompletableFuture<Void> accept(Consumer<S> operation) {
        if (this.session.getState() == PrimitiveState.CLOSED) {
            return Futures.exceptionalFuture((Throwable)((Object)new PrimitiveException.ClosedSession()));
        }
        return this.proxy.accept(operation);
    }

    @Override
    public <R> CompletableFuture<R> apply(Function<S, R> operation) {
        if (this.session.getState() == PrimitiveState.CLOSED) {
            return Futures.exceptionalFuture((Throwable)((Object)new PrimitiveException.ClosedSession()));
        }
        return this.proxy.apply(operation);
    }

    private Session getOrCreateSession(SessionId sessionId) {
        Session session = this.sessions.get((Object)sessionId);
        if (session == null) {
            session = new LocalSession(sessionId, this.name(), this.type(), null, this.service.serializer());
            this.sessions.put(session.sessionId(), session);
            this.service.register(session);
        }
        return session;
    }

    private void consume(LogRecord record) {
        CompletableFuture future;
        LogOperation operation = (LogOperation)this.decodeInternal(record.value());
        if (!operation.primitive().equals(this.name())) {
            return;
        }
        Session session = this.getOrCreateSession(operation.sessionId());
        this.currentIndex = record.index();
        this.currentSession = session;
        this.currentOperation = operation.operationId().type();
        this.currentTimestamp = record.timestamp();
        byte[] output = this.service.apply(new DefaultCommit<byte[]>(this.currentIndex, operation.operationId(), operation.operation(), this.currentSession, this.currentTimestamp));
        if (operation.sessionId().equals((Object)this.session.sessionId()) && (future = this.writeFutures.remove(operation.operationIndex())) != null) {
            future.complete(this.decode(output));
        }
        PendingRead pendingRead = this.pendingReads.peek();
        while (pendingRead != null && pendingRead.index <= record.index()) {
            this.currentSession = session = this.getOrCreateSession(this.session.sessionId());
            this.currentOperation = OperationType.QUERY;
            try {
                output = this.service.apply(new DefaultCommit<byte[]>(this.currentIndex, pendingRead.operationId, pendingRead.bytes, session, this.currentTimestamp));
                pendingRead.future.complete(output);
            }
            catch (Exception e) {
                pendingRead.future.completeExceptionally((Throwable)((Object)new PrimitiveException.ServiceException()));
            }
            this.pendingReads.remove();
            pendingRead = this.pendingReads.peek();
        }
    }

    @Override
    public void addStateChangeListener(Consumer<PrimitiveState> listener) {
        this.session.addStateChangeListener(listener);
    }

    @Override
    public void removeStateChangeListener(Consumer<PrimitiveState> listener) {
        this.session.removeStateChangeListener(listener);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<ProxySession<S>> connect() {
        if (this.connectFuture == null) {
            LogProxySession logProxySession = this;
            synchronized (logProxySession) {
                if (this.connectFuture == null) {
                    this.session.consumer().consume(1L, this::consume);
                    this.service.init(this.context);
                    this.connectFuture = this.session.connect().thenApply(v -> this);
                }
            }
        }
        return this.connectFuture;
    }

    @Override
    public CompletableFuture<Void> close() {
        return this.session.close();
    }

    @Override
    public CompletableFuture<Void> delete() {
        return this.close();
    }

    protected <T> byte[] encode(T object) {
        return object != null ? this.userSerializer.encode(object) : null;
    }

    protected <T> T decode(byte[] bytes) {
        return (T)(bytes != null ? this.userSerializer.decode(bytes) : null);
    }

    private <T> byte[] encodeInternal(T object) {
        return INTERNAL_SERIALIZER.encode(object);
    }

    private <T> T decodeInternal(byte[] bytes) {
        return (T)INTERNAL_SERIALIZER.decode(bytes);
    }

    private static class PendingRead {
        private final long index;
        private final OperationId operationId;
        private final byte[] bytes;
        private final CompletableFuture future;

        PendingRead(long index, OperationId operationId, byte[] bytes, CompletableFuture future) {
            this.index = index;
            this.operationId = operationId;
            this.bytes = bytes;
            this.future = future;
        }
    }

    private class ServiceProxyHandler
    implements InvocationHandler {
        private final ThreadLocal<CompletableFuture> future = new ThreadLocal();
        private final Map<Method, OperationId> operations = new ConcurrentHashMap<Method, OperationId>();

        private ServiceProxyHandler(Class<?> type) {
            this.operations.putAll(Operations.getMethodMap(type));
        }

        @Override
        public Object invoke(Object object, Method method, Object[] args) throws Throwable {
            OperationId operationId = this.operations.get(method);
            if (operationId != null) {
                CompletableFuture future = new CompletableFuture();
                this.future.set(future);
                byte[] bytes = LogProxySession.this.encode(args);
                if (operationId.type() == OperationType.COMMAND) {
                    long index = LogProxySession.this.operationIndex.incrementAndGet();
                    LogProxySession.this.writeFutures.put(index, future);
                    LogOperation operation = new LogOperation(LogProxySession.this.session.sessionId(), LogProxySession.this.name, index, operationId, bytes);
                    LogProxySession.this.connect().thenRun(() -> LogProxySession.this.session.producer().append(LogProxySession.this.encodeInternal(operation)).whenComplete((result, error) -> {
                        if (error == null) {
                            LogProxySession.this.lastIndex = result;
                        }
                    }));
                } else {
                    LogProxySession.this.connect().thenRun(() -> {
                        if (LogProxySession.this.currentIndex >= LogProxySession.this.lastIndex) {
                            SessionId sessionId = LogProxySession.this.session.sessionId();
                            Session session = LogProxySession.this.getOrCreateSession(sessionId);
                            byte[] output = LogProxySession.this.service.apply(new DefaultCommit<byte[]>(LogProxySession.this.currentIndex, operationId, bytes, session, LogProxySession.this.currentTimestamp));
                            future.complete(LogProxySession.this.decode(output));
                            LogProxySession.this.lastIndex = LogProxySession.this.currentIndex;
                        } else {
                            LogProxySession.this.pendingReads.add(new PendingRead(LogProxySession.this.lastIndex, operationId, bytes, future));
                        }
                    });
                }
            } else {
                throw new PrimitiveException("Unknown primitive operation: " + method.getName());
            }
            return Defaults.defaultValue(method.getReturnType());
        }

        <T> CompletableFuture<T> getResultFuture() {
            return this.future.get();
        }
    }

    private class ServiceProxy<S> {
        private final S proxy;
        private final ServiceProxyHandler handler;

        ServiceProxy(S proxy, ServiceProxyHandler handler) {
            this.proxy = proxy;
            this.handler = handler;
        }

        CompletableFuture<Void> accept(Consumer<S> operation) {
            operation.accept(this.proxy);
            return this.handler.getResultFuture();
        }

        <T> CompletableFuture<T> apply(Function<S, T> operation) {
            operation.apply(this.proxy);
            return this.handler.getResultFuture();
        }
    }

    private class LocalSession
    extends AbstractSession {
        LocalSession(SessionId sessionId, String primitiveName, PrimitiveType primitiveType, MemberId memberId, Serializer serializer) {
            super(sessionId, primitiveName, primitiveType, memberId, serializer);
        }

        @Override
        public Session.State getState() {
            return Session.State.OPEN;
        }

        @Override
        public void publish(PrimitiveEvent event) {
            Method method;
            if (this.sessionId().equals((Object)LogProxySession.this.session.sessionId()) && (method = (Method)LogProxySession.this.eventMethods.get(event.type())) != null) {
                try {
                    method.invoke(LogProxySession.this.client, (Object[])this.decode(event.value()));
                }
                catch (IllegalAccessException | InvocationTargetException e) {
                    LogProxySession.this.log.warn("Failed to handle event", (Throwable)e);
                }
            }
        }
    }

    private class LogServiceContext
    implements ServiceContext {
        private LogServiceContext() {
        }

        @Override
        public PrimitiveId serviceId() {
            return PrimitiveId.from((Long)LogProxySession.this.session.sessionId().id());
        }

        @Override
        public String serviceName() {
            return LogProxySession.this.name;
        }

        @Override
        public PrimitiveType serviceType() {
            return LogProxySession.this.type;
        }

        @Override
        public MemberId localMemberId() {
            return null;
        }

        @Override
        public <C extends ServiceConfig> C serviceConfig() {
            return (C)LogProxySession.this.serviceConfig;
        }

        @Override
        public long currentIndex() {
            return LogProxySession.this.currentIndex;
        }

        @Override
        public Session currentSession() {
            return LogProxySession.this.currentSession;
        }

        @Override
        public OperationType currentOperation() {
            return LogProxySession.this.currentOperation;
        }

        @Override
        public LogicalClock logicalClock() {
            return new LogicalClock(){

                public LogicalTimestamp getTime() {
                    return LogicalTimestamp.of((long)LogProxySession.this.currentIndex);
                }
            };
        }

        @Override
        public WallClock wallClock() {
            return new WallClock(){

                public WallClockTimestamp getTime() {
                    return WallClockTimestamp.from((long)LogProxySession.this.currentTimestamp);
                }
            };
        }
    }
}

