/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.web.rx.integration;

import org.noear.solon.core.handle.Action;
import org.noear.solon.core.handle.ActionReturnHandler;
import org.noear.solon.core.handle.Context;
import org.noear.solon.core.util.ClassUtil;
import org.noear.solon.core.util.MimeType;
import org.noear.solon.web.rx.integration.ActionRxSubscriber;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;

public class ActionReturnRxHandler
implements ActionReturnHandler {
    private final boolean hasReactor = ClassUtil.hasClass(() -> Flux.class);

    public boolean matched(Context ctx, Class<?> returnType) {
        return Publisher.class.isAssignableFrom(returnType);
    }

    public void returnHandle(Context ctx, Action action, Object result) throws Throwable {
        if (result != null) {
            if (!ctx.asyncSupported()) {
                throw new IllegalStateException("This boot plugin does not support asynchronous mode");
            }
            boolean isStreaming = this.isStreaming(ctx);
            Publisher publisher = this.postPublisher(ctx, action, result, isStreaming);
            publisher.subscribe((Subscriber)new ActionRxSubscriber(ctx, action, isStreaming));
        }
    }

    protected boolean isStreaming(Context ctx) {
        return MimeType.isStreaming((String)ctx.acceptNew());
    }

    protected Publisher postPublisher(Context ctx, Action action, Object result, boolean isStreaming) throws Throwable {
        if (this.hasReactor && result instanceof Flux && !isStreaming) {
            return ((Flux)result).collectList();
        }
        return (Publisher)result;
    }
}

