/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tinkerpop.gremlin.server.op;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.script.Bindings;
import org.apache.commons.lang.time.StopWatch;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
import org.apache.tinkerpop.gremlin.server.Context;
import org.apache.tinkerpop.gremlin.server.GremlinServer;
import org.apache.tinkerpop.gremlin.server.OpProcessor;
import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.op.OpProcessorException;
import org.apache.tinkerpop.gremlin.server.util.MetricManager;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.util.function.ThrowingConsumer;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractEvalOpProcessor
implements OpProcessor {
    private static final Logger logger = LoggerFactory.getLogger(AbstractEvalOpProcessor.class);
    private static final Timer evalOpTimer = MetricManager.INSTANCE.getTimer(MetricRegistry.name(GremlinServer.class, (String[])new String[]{"op", "eval"}));
    private static final List<String> invalidBindingsKeys = Arrays.asList(T.id.getAccessor(), T.key.getAccessor(), T.label.getAccessor(), T.value.getAccessor());
    private static final String invalidBindingKeysJoined = String.join((CharSequence)",", invalidBindingsKeys);
    protected final boolean manageTransactions;

    protected AbstractEvalOpProcessor(boolean manageTransactions) {
        this.manageTransactions = manageTransactions;
    }

    public abstract ThrowingConsumer<Context> getEvalOp();

    public abstract Optional<ThrowingConsumer<Context>> selectOther(RequestMessage var1) throws OpProcessorException;

    @Override
    public ThrowingConsumer<Context> select(Context ctx) throws OpProcessorException {
        ThrowingConsumer<Context> op;
        RequestMessage message = ctx.getRequestMessage();
        logger.debug("Selecting processor for RequestMessage {}", (Object)message);
        switch (message.getOp()) {
            case "eval": {
                op = this.validateEvalMessage(message).orElse(this.getEvalOp());
                break;
            }
            case "invalid": {
                String msgInvalid = String.format("Message could not be parsed.  Check the format of the request. [%s]", message);
                throw new OpProcessorException(msgInvalid, ResponseMessage.build((RequestMessage)message).code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).result((Object)msgInvalid).create());
            }
            default: {
                op = this.selectOther(message).orElseThrow(() -> {
                    String msgDefault = String.format("Message with op code [%s] is not recognized.", message.getOp());
                    return new OpProcessorException(msgDefault, ResponseMessage.build((RequestMessage)message).code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).result((Object)msgDefault).create());
                });
            }
        }
        return op;
    }

    protected Optional<ThrowingConsumer<Context>> validateEvalMessage(RequestMessage message) throws OpProcessorException {
        if (!message.optionalArgs("gremlin").isPresent()) {
            String msg = String.format("A message with an [%s] op code requires a [%s] argument.", "eval", "gremlin");
            throw new OpProcessorException(msg, ResponseMessage.build((RequestMessage)message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).result((Object)msg).create());
        }
        if (message.optionalArgs("bindings").isPresent()) {
            Map bindings = (Map)message.getArgs().get("bindings");
            if (bindings.keySet().stream().anyMatch(invalidBindingsKeys::contains)) {
                String msg = String.format("The [%s] message is using at least one of the invalid binding key of [%s]. It conflicts with standard static imports to Gremlin Server.", "eval", invalidBindingKeysJoined);
                throw new OpProcessorException(msg, ResponseMessage.build((RequestMessage)message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).result((Object)msg).create());
            }
        }
        return Optional.empty();
    }

    protected void evalOpInternal(Context context, Supplier<GremlinExecutor> gremlinExecutorSupplier, BindingSupplier<Bindings> bindingsSupplier) throws OpProcessorException {
        Timer.Context timerContext = evalOpTimer.time();
        ChannelHandlerContext ctx = context.getChannelHandlerContext();
        RequestMessage msg = context.getRequestMessage();
        GremlinExecutor gremlinExecutor = gremlinExecutorSupplier.get();
        Map args = msg.getArgs();
        String script = (String)args.get("gremlin");
        String language = args.containsKey("language") ? (String)args.get("language") : null;
        Bindings bindings = bindingsSupplier.get();
        CompletableFuture evalFuture = gremlinExecutor.eval(script, language, bindings, null, o -> {
            Iterator itty = IteratorUtils.asIterator((Object)o);
            logger.debug("Preparing to iterate results from - {} - in thread [{}]", (Object)msg, (Object)Thread.currentThread().getName());
            try {
                this.handleIterator(context, itty);
            }
            catch (TimeoutException ex) {
                String errorMessage = String.format("Response iteration exceeded the configured threshold for request [%s] - %s", msg, ex.getMessage());
                logger.warn(errorMessage);
                ctx.writeAndFlush((Object)ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create());
            }
            catch (Exception ex) {
                logger.warn(String.format("Exception processing a script on request [%s].", msg), (Throwable)ex);
                ctx.writeAndFlush((Object)ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create());
            }
        });
        evalFuture.handle((v, t) -> {
            timerContext.stop();
            if (t != null) {
                if (t instanceof TimeoutException) {
                    String errorMessage = String.format("Response evaluation exceeded the configured threshold for request [%s] - %s", msg, t.getMessage());
                    logger.warn(errorMessage);
                    ctx.writeAndFlush((Object)ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(t.getMessage()).create());
                } else {
                    logger.warn(String.format("Exception processing a script on request [%s].", msg), t);
                    ctx.writeAndFlush((Object)ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.SERVER_ERROR_SCRIPT_EVALUATION).statusMessage(t.getMessage()).create());
                }
            }
            return null;
        });
    }

    protected void handleIterator(Context context, Iterator itty) throws TimeoutException, InterruptedException {
        ChannelHandlerContext ctx = context.getChannelHandlerContext();
        RequestMessage msg = context.getRequestMessage();
        Settings settings = context.getSettings();
        boolean warnOnce = false;
        if (!itty.hasNext()) {
            if (this.manageTransactions) {
                context.getGraphManager().commitAll();
            }
            ctx.writeAndFlush((Object)ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.NO_CONTENT).create());
        }
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        int resultIterationBatchSize = msg.optionalArgs("batchSize").orElse(settings.resultIterationBatchSize);
        ArrayList aggregate = new ArrayList(resultIterationBatchSize);
        while (itty.hasNext()) {
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            if (aggregate.size() < resultIterationBatchSize) {
                aggregate.add(itty.next());
            }
            if (!itty.hasNext() && this.manageTransactions) {
                context.getGraphManager().commitAll();
            }
            if (ctx.channel().isWritable()) {
                if (aggregate.size() == resultIterationBatchSize || !itty.hasNext()) {
                    ResponseStatusCode code = itty.hasNext() ? ResponseStatusCode.PARTIAL_CONTENT : ResponseStatusCode.SUCCESS;
                    ctx.writeAndFlush((Object)ResponseMessage.build((RequestMessage)msg).code(code).result(aggregate).create());
                    aggregate = new ArrayList(resultIterationBatchSize);
                }
            } else {
                if (!warnOnce) {
                    logger.warn("Pausing response writing as writeBufferHighWaterMark exceeded on {} - writing will continue once client has caught up", (Object)msg);
                    warnOnce = true;
                }
                TimeUnit.MILLISECONDS.sleep(10L);
            }
            stopWatch.split();
            if (stopWatch.getSplitTime() > settings.serializedResponseTimeout) {
                String timeoutMsg = String.format("Serialization of the entire response exceeded the serializeResponseTimeout setting %s", warnOnce ? "[Gremlin Server paused writes to client as messages were not being consumed quickly enough]" : "");
                throw new TimeoutException(timeoutMsg.trim());
            }
            stopWatch.unsplit();
        }
        stopWatch.stop();
    }

    @FunctionalInterface
    public static interface BindingSupplier<T> {
        public T get() throws OpProcessorException;
    }
}

