package io.micronaut.http.server.netty.binders;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.async.subscriber.CompletionAwareSubscriber;
import io.micronaut.core.bind.ArgumentBinder;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.bind.binders.NonBlockingBodyArgumentBinder;
import io.micronaut.http.netty.stream.StreamedHttpRequest;
import io.micronaut.http.server.netty.HttpContentProcessor;
import io.micronaut.http.server.netty.HttpContentProcessorResolver;
import io.micronaut.http.server.netty.NettyHttpRequest;
import io.micronaut.http.server.netty.NettyHttpServer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.EmptyByteBuf;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

@Internal
/* loaded from: input_file:io/micronaut/http/server/netty/binders/InputStreamBodyBinder.class */
public class InputStreamBodyBinder implements NonBlockingBodyArgumentBinder<InputStream> {
    public static final Argument<InputStream> TYPE = Argument.of(InputStream.class);
    private static final Logger LOG = LoggerFactory.getLogger(NettyHttpServer.class);
    private final HttpContentProcessorResolver processorResolver;
    private final ExecutorService executorService;

    public InputStreamBodyBinder(HttpContentProcessorResolver httpContentProcessorResolver, ExecutorService executorService) {
        this.processorResolver = httpContentProcessorResolver;
        this.executorService = executorService;
    }

    public Argument<InputStream> argumentType() {
        return TYPE;
    }

    public ArgumentBinder.BindingResult<InputStream> bind(final ArgumentConversionContext<InputStream> argumentConversionContext, HttpRequest<?> httpRequest) {
        if (httpRequest instanceof NettyHttpRequest) {
            final NettyHttpRequest nettyHttpRequest = (NettyHttpRequest) httpRequest;
            if (nettyHttpRequest.getNativeRequest() instanceof StreamedHttpRequest) {
                final PipedOutputStream pipedOutputStream = new PipedOutputStream();
                try {
                    PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream) { // from class: io.micronaut.http.server.netty.binders.InputStreamBodyBinder.1
                        private volatile HttpContentProcessor<ByteBufHolder> processor;

                        private void init() {
                            if (this.processor == null) {
                                this.processor = InputStreamBodyBinder.this.processorResolver.resolve(nettyHttpRequest, argumentConversionContext.getArgument());
                                Flux.from(this.processor).publishOn(Schedulers.fromExecutor(InputStreamBodyBinder.this.executorService)).subscribe(new CompletionAwareSubscriber<ByteBufHolder>() { // from class: io.micronaut.http.server.netty.binders.InputStreamBodyBinder.1.1
                                    protected void doOnSubscribe(Subscription subscription) {
                                        subscription.request(1L);
                                    }

                                    /* JADX INFO: Access modifiers changed from: protected */
                                    public synchronized void doOnNext(ByteBufHolder byteBufHolder) {
                                        if (InputStreamBodyBinder.LOG.isTraceEnabled()) {
                                            InputStreamBodyBinder.LOG.trace("Server received streaming message for argument [{}]: {}", argumentConversionContext.getArgument(), byteBufHolder);
                                        }
                                        ByteBuf content = byteBufHolder.content();
                                        try {
                                            if (!(content instanceof EmptyByteBuf)) {
                                                try {
                                                    byte[] bytes = ByteBufUtil.getBytes(content);
                                                    pipedOutputStream.write(bytes, 0, bytes.length);
                                                    content.release();
                                                } catch (IOException e) {
                                                    this.subscription.cancel();
                                                    content.release();
                                                    return;
                                                }
                                            }
                                            this.subscription.request(1L);
                                        } catch (Throwable th) {
                                            content.release();
                                            throw th;
                                        }
                                    }

                                    protected synchronized void doOnError(Throwable th) {
                                        if (InputStreamBodyBinder.LOG.isTraceEnabled()) {
                                            InputStreamBodyBinder.LOG.trace("Server received error for argument [" + argumentConversionContext.getArgument() + "]: " + th.getMessage(), th);
                                        }
                                        try {
                                            pipedOutputStream.close();
                                        } catch (IOException e) {
                                        } finally {
                                            this.subscription.cancel();
                                        }
                                    }

                                    protected synchronized void doOnComplete() {
                                        if (InputStreamBodyBinder.LOG.isTraceEnabled()) {
                                            InputStreamBodyBinder.LOG.trace("Done receiving messages for argument: {}", argumentConversionContext.getArgument());
                                        }
                                        try {
                                            pipedOutputStream.close();
                                        } catch (IOException e) {
                                        }
                                    }
                                });
                            }
                        }

                        @Override // java.io.PipedInputStream, java.io.InputStream
                        public synchronized int read(byte[] bArr, int i, int i2) throws IOException {
                            init();
                            return super.read(bArr, i, i2);
                        }

                        @Override // java.io.PipedInputStream, java.io.InputStream
                        public synchronized int read() throws IOException {
                            init();
                            return super.read();
                        }
                    };
                    return () -> {
                        return Optional.of(pipedInputStream);
                    };
                } catch (IOException e) {
                    argumentConversionContext.reject(e);
                }
            }
        }
        return ArgumentBinder.BindingResult.EMPTY;
    }

    public /* bridge */ /* synthetic */ ArgumentBinder.BindingResult bind(ArgumentConversionContext argumentConversionContext, Object obj) {
        return bind((ArgumentConversionContext<InputStream>) argumentConversionContext, (HttpRequest<?>) obj);
    }
}
