/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.graphql.server.webmvc;

import graphql.ErrorClassification;
import graphql.ExecutionResult;
import graphql.GraphQLError;
import graphql.GraphqlErrorBuilder;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.springframework.graphql.execution.ErrorType;
import org.springframework.graphql.execution.SubscriptionPublisherException;
import org.springframework.graphql.server.WebGraphQlHandler;
import org.springframework.graphql.server.WebGraphQlResponse;
import org.springframework.graphql.server.webmvc.AbstractGraphQlHttpHandler;
import org.springframework.lang.Nullable;
import org.springframework.web.context.request.async.AsyncRequestTimeoutException;
import org.springframework.web.servlet.function.ServerRequest;
import org.springframework.web.servlet.function.ServerResponse;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class GraphQlSseHandler
extends AbstractGraphQlHttpHandler {
    @Nullable
    private final Duration timeout;

    public GraphQlSseHandler(WebGraphQlHandler graphQlHandler) {
        this(graphQlHandler, (Duration)null);
    }

    public GraphQlSseHandler(WebGraphQlHandler graphQlHandler, @Nullable Duration timeout) {
        super(graphQlHandler, null);
        this.timeout = timeout;
    }

    @Override
    protected ServerResponse prepareResponse(ServerRequest request, Mono<WebGraphQlResponse> responseMono) {
        Flux resultFlux = responseMono.flatMapMany(response -> {
            if (response.getData() instanceof Publisher) {
                Publisher publisher = (Publisher)response.getData();
                return Flux.from((Publisher)publisher).map(ExecutionResult::toSpecification);
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("A subscription DataFetcher must return a Publisher: " + response.getData()));
            }
            return Flux.just((Object)ExecutionResult.newExecutionResult().addError(GraphQLError.newError().errorType((ErrorClassification)graphql.ErrorType.OperationNotSupported).message("SSE handler supports only subscriptions", new Object[0]).build()).build().toSpecification());
        });
        return this.timeout != null ? ServerResponse.sse(SseSubscriber.connect((Flux<Map<String, Object>>)resultFlux), (Duration)this.timeout) : ServerResponse.sse(SseSubscriber.connect((Flux<Map<String, Object>>)resultFlux));
    }

    private static final class SseSubscriber
    extends BaseSubscriber<Map<String, Object>> {
        private final ServerResponse.SseBuilder sseBuilder;

        private SseSubscriber(ServerResponse.SseBuilder sseBuilder) {
            this.sseBuilder = sseBuilder;
            this.sseBuilder.onTimeout(() -> this.cancelWithError((Throwable)new AsyncRequestTimeoutException()));
        }

        protected void hookOnNext(Map<String, Object> value) {
            this.sendNext(value);
        }

        private void sendNext(Map<String, Object> value) {
            try {
                this.sseBuilder.event("next");
                this.sseBuilder.data(value);
            }
            catch (IOException exception) {
                this.cancelWithError(exception);
            }
        }

        private void cancelWithError(Throwable ex) {
            this.cancel();
            this.sseBuilder.error(ex);
        }

        protected void hookOnError(Throwable ex) {
            this.sendNext(SseSubscriber.exceptionToResultMap(ex));
            this.sendComplete();
        }

        private static Map<String, Object> exceptionToResultMap(Throwable ex) {
            Map map;
            if (ex instanceof SubscriptionPublisherException) {
                SubscriptionPublisherException spe = (SubscriptionPublisherException)((Object)ex);
                map = spe.toMap();
            } else {
                map = GraphqlErrorBuilder.newError().message("Subscription error", new Object[0]).errorType((ErrorClassification)ErrorType.INTERNAL_ERROR).build().toSpecification();
            }
            return map;
        }

        private void sendComplete() {
            try {
                this.sseBuilder.event("complete").data((Object)"");
            }
            catch (IOException exc) {
                throw new RuntimeException(exc);
            }
            this.sseBuilder.complete();
        }

        protected void hookOnComplete() {
            this.sendComplete();
        }

        static Consumer<ServerResponse.SseBuilder> connect(Flux<Map<String, Object>> resultFlux) {
            return sseBuilder -> {
                SseSubscriber subscriber = new SseSubscriber((ServerResponse.SseBuilder)sseBuilder);
                resultFlux.subscribe((CoreSubscriber)subscriber);
            };
        }
    }
}

