/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.mongodb.gridfs;

import com.mongodb.reactivestreams.client.Success;
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

class DataBufferPublisherAdapter {
    DataBufferPublisherAdapter() {
    }

    static Flux<DataBuffer> createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) {
        return Flux.usingWhen((Publisher)Mono.just((Object)new DelegatingAsyncInputStream(inputStream, dataBufferFactory, bufferSize)), DataBufferPublisherAdapter::doRead, AsyncInputStream::close, (it, err) -> it.close(), AsyncInputStream::close);
    }

    private static Flux<DataBuffer> doRead(DelegatingAsyncInputStream inputStream) {
        AsyncInputStreamHandler streamHandler = new AsyncInputStreamHandler(inputStream, inputStream.dataBufferFactory, inputStream.bufferSize);
        return Flux.create(sink -> {
            sink.onDispose(streamHandler::close);
            sink.onCancel(streamHandler::close);
            sink.onRequest(n -> streamHandler.request((FluxSink<DataBuffer>)sink, n));
        });
    }

    static class AsyncInputStreamHandler {
        private static final AtomicLongFieldUpdater<AsyncInputStreamHandler> DEMAND = AtomicLongFieldUpdater.newUpdater(AsyncInputStreamHandler.class, "demand");
        private static final AtomicIntegerFieldUpdater<AsyncInputStreamHandler> STATE = AtomicIntegerFieldUpdater.newUpdater(AsyncInputStreamHandler.class, "state");
        private static final AtomicIntegerFieldUpdater<AsyncInputStreamHandler> READ = AtomicIntegerFieldUpdater.newUpdater(AsyncInputStreamHandler.class, "read");
        private static final int STATE_OPEN = 0;
        private static final int STATE_CLOSED = 1;
        private static final int READ_NONE = 0;
        private static final int READ_IN_PROGRESS = 1;
        final AsyncInputStream inputStream;
        final DataBufferFactory dataBufferFactory;
        final int bufferSize;
        volatile long demand;
        volatile int state = 0;
        volatile int read = 0;

        void request(FluxSink<DataBuffer> sink, long n) {
            Operators.addCap(DEMAND, (Object)this, (long)n);
            this.drainLoop(sink);
        }

        void drainLoop(FluxSink<DataBuffer> sink) {
            while (this.onShouldRead()) {
                this.emitNext(sink);
            }
        }

        boolean onShouldRead() {
            return !this.isClosed() && this.getDemand() > 0L && this.onWantRead();
        }

        boolean onWantRead() {
            return READ.compareAndSet(this, 0, 1);
        }

        void onReadDone() {
            READ.compareAndSet(this, 1, 0);
        }

        long getDemand() {
            return DEMAND.get(this);
        }

        void decrementDemand() {
            DEMAND.decrementAndGet(this);
        }

        void close() {
            STATE.compareAndSet(this, 0, 1);
        }

        boolean isClosed() {
            return STATE.get(this) == 1;
        }

        private void emitNext(FluxSink<DataBuffer> sink) {
            ByteBuffer transport = ByteBuffer.allocate(this.bufferSize);
            BufferCoreSubscriber bufferCoreSubscriber = new BufferCoreSubscriber(sink, this.dataBufferFactory, transport);
            try {
                this.inputStream.read(transport).subscribe((Subscriber)bufferCoreSubscriber);
            }
            catch (Throwable e) {
                sink.error(e);
            }
        }

        public AsyncInputStreamHandler(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) {
            this.inputStream = inputStream;
            this.dataBufferFactory = dataBufferFactory;
            this.bufferSize = bufferSize;
        }

        private class BufferCoreSubscriber
        implements CoreSubscriber<Integer> {
            private final FluxSink<DataBuffer> sink;
            private final DataBufferFactory factory;
            private final ByteBuffer transport;
            private final Thread subscribeThread = Thread.currentThread();
            private volatile Subscription subscription;

            BufferCoreSubscriber(FluxSink<DataBuffer> sink, DataBufferFactory factory, ByteBuffer transport) {
                this.sink = sink;
                this.factory = factory;
                this.transport = transport;
            }

            public Context currentContext() {
                return this.sink.currentContext();
            }

            public void onSubscribe(Subscription s) {
                this.subscription = s;
                s.request(1L);
            }

            public void onNext(Integer bytes) {
                if (AsyncInputStreamHandler.this.isClosed()) {
                    AsyncInputStreamHandler.this.onReadDone();
                    return;
                }
                if (bytes > 0) {
                    DataBuffer buffer = this.readNextChunk();
                    this.sink.next((Object)buffer);
                    AsyncInputStreamHandler.this.decrementDemand();
                }
                try {
                    if (bytes == -1) {
                        this.sink.complete();
                        return;
                    }
                }
                finally {
                    AsyncInputStreamHandler.this.onReadDone();
                }
                this.subscription.request(1L);
            }

            private DataBuffer readNextChunk() {
                this.transport.flip();
                DataBuffer dataBuffer = this.factory.allocateBuffer(this.transport.remaining());
                dataBuffer.write(new ByteBuffer[]{this.transport});
                this.transport.clear();
                return dataBuffer;
            }

            public void onError(Throwable t) {
                if (AsyncInputStreamHandler.this.isClosed()) {
                    Operators.onErrorDropped((Throwable)t, (Context)this.sink.currentContext());
                    return;
                }
                AsyncInputStreamHandler.this.onReadDone();
                this.sink.error(t);
            }

            public void onComplete() {
                if (this.subscribeThread != Thread.currentThread()) {
                    AsyncInputStreamHandler.this.drainLoop(this.sink);
                }
            }
        }
    }

    private static class DelegatingAsyncInputStream
    implements AsyncInputStream {
        private final AsyncInputStream inputStream;
        private final DataBufferFactory dataBufferFactory;
        private int bufferSize;

        DelegatingAsyncInputStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) {
            this.inputStream = inputStream;
            this.dataBufferFactory = dataBufferFactory;
            this.bufferSize = bufferSize;
        }

        public Publisher<Integer> read(ByteBuffer dst) {
            return this.inputStream.read(dst);
        }

        public Publisher<Long> skip(long bytesToSkip) {
            return this.inputStream.skip(bytesToSkip);
        }

        public Publisher<Success> close() {
            return this.inputStream.close();
        }
    }
}

