/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.ext.web.handler.graphql.impl.ws;

import graphql.ExecutionInput;
import graphql.ExecutionResult;
import graphql.GraphQL;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.graphql.ExecutionInputBuilderWithContext;
import io.vertx.ext.web.handler.graphql.impl.ErrorUtil;
import io.vertx.ext.web.handler.graphql.impl.GraphQLQuery;
import io.vertx.ext.web.handler.graphql.impl.ws.ConnectionInitEventImpl;
import io.vertx.ext.web.handler.graphql.impl.ws.MessageImpl;
import io.vertx.ext.web.handler.graphql.ws.ConnectionInitEvent;
import io.vertx.ext.web.handler.graphql.ws.Message;
import io.vertx.ext.web.handler.graphql.ws.MessageType;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class ConnectionHandler {
    private static final Logger log = LoggerFactory.getLogger(ConnectionHandler.class);
    private final GraphQL graphQL;
    private final long connectionInitWaitTimeout;
    private final Handler<ConnectionInitEvent> connectionInitHandler;
    private final Handler<ExecutionInputBuilderWithContext<Message>> beforeExecuteHandler;
    private final Handler<Message> messageHandler;
    private final Handler<ServerWebSocket> endHandler;
    private final ContextInternal context;
    private final ServerWebSocket socket;
    private final Map<Class<RoutingContext>, Object> mapOfContext;
    private ConnectionState state;
    private static final Subscription TRANSIENT_SUBSCRIPTION = new Subscription(){

        public void request(long l) {
            throw new IllegalStateException();
        }

        public void cancel() {
        }
    };

    public ConnectionHandler(GraphQL graphQL, long connectionInitWaitTimeout, Handler<ConnectionInitEvent> connectionInitHandler, Handler<ExecutionInputBuilderWithContext<Message>> beforeExecuteHandler, Handler<Message> messageHandler, Handler<ServerWebSocket> endHandler, RoutingContext routingContext, ServerWebSocket socket) {
        this.graphQL = graphQL;
        this.connectionInitWaitTimeout = connectionInitWaitTimeout;
        this.connectionInitHandler = connectionInitHandler;
        this.beforeExecuteHandler = beforeExecuteHandler;
        this.messageHandler = messageHandler;
        this.endHandler = endHandler;
        this.context = (ContextInternal)routingContext.vertx().getOrCreateContext();
        this.socket = socket;
        this.mapOfContext = Collections.singletonMap(RoutingContext.class, routingContext);
        this.state = new InitialState();
    }

    public void handleConnection() {
        this.socket.closeHandler(this::close).binaryMessageHandler(this::handleBinaryMessage).textMessageHandler(this::handleTextMessage);
    }

    private void handleBinaryMessage(Buffer buffer) {
        this.handleMessage(new JsonObject(buffer));
    }

    private void handleTextMessage(String text) {
        this.handleMessage(new JsonObject(text));
    }

    private void handleMessage(JsonObject json) {
        String typeStr = json.getString("type");
        MessageType type = MessageType.from(typeStr);
        if (type == null) {
            this.socket.close((short)4400, "Unknown message type: " + typeStr);
            return;
        }
        MessageImpl message = this.state.createMessage(type, json);
        Handler<Message> mh = this.messageHandler;
        if (mh != null) {
            mh.handle((Object)message);
        }
        this.state.handleMessage(message);
    }

    private void sendPong(MessageImpl msg) {
        this.sendMessage(null, MessageType.PONG, msg.message().getJsonObject("payload"));
    }

    private void sendMessage(String id, MessageType type, Object payload) {
        JsonObject message = new JsonObject();
        if (id != null) {
            message.put("id", (Object)id);
        }
        message.put("type", (Object)type.getText());
        if (payload != null) {
            message.put("payload", payload);
        }
        this.socket.writeTextMessage(message.encode());
    }

    private void close(Void unused) {
        this.state.close();
        Handler<ServerWebSocket> eh = this.endHandler;
        if (eh != null) {
            eh.handle((Object)this.socket);
        }
    }

    private class ReadyState
    implements ConnectionState {
        final Object connectionParams;
        final Executor executor;
        final ConcurrentMap<String, Subscription> subscriptions;

        ReadyState(Object connectionParams) {
            this.connectionParams = connectionParams;
            this.executor = task -> ConnectionHandler.this.context.runOnContext(v -> task.run());
            this.subscriptions = new ConcurrentHashMap<String, Subscription>();
        }

        @Override
        public MessageImpl createMessage(MessageType type, JsonObject message) {
            return new MessageImpl(ConnectionHandler.this.socket, type, message, this.connectionParams);
        }

        @Override
        public void handleMessage(MessageImpl msg) {
            switch (msg.type()) {
                case PING: {
                    ConnectionHandler.this.sendPong(msg);
                    break;
                }
                case PONG: {
                    break;
                }
                case SUBSCRIBE: {
                    this.subscribe(msg);
                    break;
                }
                case COMPLETE: {
                    this.unsubscribe(msg);
                    break;
                }
                default: {
                    ConnectionHandler.this.socket.close((short)4400, "Unexpected message type: " + String.valueOf((Object)msg.type()));
                }
            }
        }

        void subscribe(final MessageImpl msg) {
            String query;
            Map<String, Object> extensions;
            Object initialValue;
            Map<String, Object> variables;
            String id = msg.id();
            if (id == null) {
                ConnectionHandler.this.socket.close((short)4400, "Subscribe message must have an ID");
                return;
            }
            if (this.subscriptions.putIfAbsent(id, TRANSIENT_SUBSCRIPTION) != null) {
                ConnectionHandler.this.socket.close((short)4409, "Subscriber for " + id + " already exists");
                return;
            }
            GraphQLQuery payload = new GraphQLQuery(msg.message().getJsonObject("payload"));
            final ExecutionInput.Builder builder = ExecutionInput.newExecutionInput();
            String operationName = payload.getOperationName();
            if (operationName != null) {
                builder.operationName(operationName);
            }
            if ((variables = payload.getVariables()) != null) {
                builder.variables(variables);
            }
            if ((initialValue = payload.getInitialValue()) != null) {
                builder.root(initialValue);
            }
            if ((extensions = payload.getExtensions()) != null) {
                builder.extensions(extensions);
            }
            if ((query = payload.getQuery()) != null) {
                builder.query(query);
            } else if (extensions != null && extensions.containsKey("persistedQuery")) {
                builder.query("PersistedQueryMarker");
            }
            builder.graphQLContext(ConnectionHandler.this.mapOfContext);
            Handler<ExecutionInputBuilderWithContext<Message>> beforeExecute = ConnectionHandler.this.beforeExecuteHandler;
            if (beforeExecute != null) {
                beforeExecute.handle((Object)new ExecutionInputBuilderWithContext<Message>(){

                    @Override
                    public Message context() {
                        return msg;
                    }

                    @Override
                    public ExecutionInput.Builder builder() {
                        return builder;
                    }
                });
            }
            ConnectionHandler.this.graphQL.executeAsync(builder).whenCompleteAsync((executionResult, throwable) -> {
                if (throwable == null) {
                    if (executionResult.getData() instanceof Publisher) {
                        Publisher data = (Publisher)executionResult.getData();
                        data.subscribe((org.reactivestreams.Subscriber)new Subscriber(id));
                    } else {
                        this.subscriptions.remove(id);
                        ConnectionHandler.this.sendMessage(id, MessageType.NEXT, new JsonObject(executionResult.toSpecification()));
                        ConnectionHandler.this.sendMessage(id, MessageType.COMPLETE, null);
                    }
                } else {
                    this.subscriptions.remove(id);
                    ConnectionHandler.this.sendMessage(id, MessageType.ERROR, new JsonArray().add((Object)ErrorUtil.toJsonObject(throwable)));
                }
            }, this.executor);
        }

        void unsubscribe(MessageImpl msg) {
            String id = msg.id();
            if (id == null) {
                ConnectionHandler.this.socket.close((short)4400, "Complete message must have an ID");
                return;
            }
            Subscription s = (Subscription)this.subscriptions.remove(id);
            if (s != null) {
                s.cancel();
            }
        }

        @Override
        public void close() {
            this.subscriptions.values().forEach(Subscription::cancel);
        }

        class Subscriber
        implements org.reactivestreams.Subscriber<ExecutionResult> {
            final String id;
            volatile Subscription subscription;

            Subscriber(String id) {
                this.id = id;
            }

            public void onSubscribe(Subscription s) {
                this.subscription = s;
                if (!ReadyState.this.subscriptions.replace(this.id, TRANSIENT_SUBSCRIPTION, s)) {
                    s.cancel();
                } else {
                    s.request(1L);
                }
            }

            public void onNext(ExecutionResult er) {
                ConnectionHandler.this.sendMessage(this.id, MessageType.NEXT, new JsonObject(er.toSpecification()));
                this.subscription.request(1L);
            }

            public void onError(Throwable t) {
                ConnectionHandler.this.sendMessage(this.id, MessageType.ERROR, new JsonArray().add((Object)ErrorUtil.toJsonObject(t)));
                ReadyState.this.subscriptions.remove(this.id);
            }

            public void onComplete() {
                ConnectionHandler.this.sendMessage(this.id, MessageType.COMPLETE, null);
                ReadyState.this.subscriptions.remove(this.id);
            }
        }
    }

    private class InitializingState
    implements ConnectionState,
    Handler<AsyncResult<Object>> {
        InitializingState(Future<Object> connectionFuture) {
            connectionFuture.onComplete((Handler)this);
        }

        public void handle(AsyncResult<Object> ar) {
            if (ar.succeeded()) {
                this.connect(ar.result());
            } else {
                log.trace((Object)"Failed to initialize GraphQLWS socket", ar.cause());
                ConnectionHandler.this.socket.close((short)4401, "Unauthorized");
            }
        }

        void connect(Object connectionParams) {
            ConnectionHandler.this.sendMessage(null, MessageType.CONNECTION_ACK, null);
            ConnectionHandler.this.state = new ReadyState(connectionParams);
        }

        @Override
        public MessageImpl createMessage(MessageType type, JsonObject message) {
            return new MessageImpl(ConnectionHandler.this.socket, type, message);
        }

        @Override
        public void handleMessage(MessageImpl msg) {
            switch (msg.type()) {
                case CONNECTION_INIT: {
                    ConnectionHandler.this.socket.close((short)4429, "Too many initialisation requests");
                    break;
                }
                case PING: {
                    ConnectionHandler.this.sendPong(msg);
                    break;
                }
                case PONG: {
                    break;
                }
                default: {
                    ConnectionHandler.this.socket.close((short)4401, "Unauthorized");
                }
            }
        }

        @Override
        public void close() {
        }
    }

    private class InitialState
    implements ConnectionState,
    Handler<Long> {
        final long timerId;

        InitialState() {
            this.timerId = ConnectionHandler.this.context.setTimer(ConnectionHandler.this.connectionInitWaitTimeout, (Handler)this);
        }

        public void handle(Long unused) {
            ConnectionHandler.this.socket.close((short)4408, "Connection initialisation timeout");
        }

        @Override
        public MessageImpl createMessage(MessageType type, JsonObject message) {
            return new MessageImpl(ConnectionHandler.this.socket, type, message);
        }

        @Override
        public void handleMessage(MessageImpl msg) {
            ServerWebSocket socket = msg.socket();
            switch (msg.type()) {
                case CONNECTION_INIT: {
                    this.connectionInit(msg);
                    break;
                }
                case PING: {
                    ConnectionHandler.this.sendPong(msg);
                    break;
                }
                case PONG: {
                    break;
                }
                default: {
                    socket.close((short)4401, "Unauthorized");
                }
            }
        }

        void connectionInit(MessageImpl msg) {
            ConnectionHandler.this.context.owner().cancelTimer(this.timerId);
            Handler<ConnectionInitEvent> cih = ConnectionHandler.this.connectionInitHandler;
            if (cih != null) {
                PromiseInternal connectionPromise = ConnectionHandler.this.context.promise();
                ConnectionHandler.this.state = new InitializingState((Future<Object>)connectionPromise.future());
                cih.handle((Object)new ConnectionInitEventImpl(msg, (Promise<Object>)connectionPromise));
            } else {
                ConnectionHandler.this.sendMessage(null, MessageType.CONNECTION_ACK, null);
                ConnectionHandler.this.state = new ReadyState(null);
            }
        }

        @Override
        public void close() {
            ConnectionHandler.this.context.owner().cancelTimer(this.timerId);
        }
    }

    private static interface ConnectionState {
        public MessageImpl createMessage(MessageType var1, JsonObject var2);

        public void handleMessage(MessageImpl var1);

        public void close();
    }
}

