/*
 * Decompiled with CFR 0.152.
 */
package io.opentelemetry.testing.internal.armeria.common.stream;

import io.opentelemetry.testing.internal.armeria.common.HttpData;
import io.opentelemetry.testing.internal.armeria.common.annotation.Nullable;
import io.opentelemetry.testing.internal.armeria.common.stream.AbortedStreamException;
import io.opentelemetry.testing.internal.armeria.common.stream.ByteStreamMessage;
import io.opentelemetry.testing.internal.armeria.common.stream.CancelledSubscriptionException;
import io.opentelemetry.testing.internal.armeria.common.stream.NoopSubscriber;
import io.opentelemetry.testing.internal.armeria.common.stream.SubscriptionOption;
import io.opentelemetry.testing.internal.armeria.common.util.EventLoopCheckingFuture;
import io.opentelemetry.testing.internal.armeria.common.util.Exceptions;
import io.opentelemetry.testing.internal.armeria.internal.common.stream.InternalStreamMessageUtil;
import io.opentelemetry.testing.internal.armeria.internal.common.stream.NoopSubscription;
import io.opentelemetry.testing.internal.armeria.internal.shaded.guava.base.Preconditions;
import io.opentelemetry.testing.internal.armeria.internal.shaded.guava.collect.ImmutableSet;
import io.opentelemetry.testing.internal.armeria.internal.shaded.guava.math.LongMath;
import io.opentelemetry.testing.internal.armeria.internal.shaded.guava.primitives.Ints;
import io.opentelemetry.testing.internal.armeria.server.ServiceRequestContext;
import io.opentelemetry.testing.internal.io.netty.buffer.ByteBuf;
import io.opentelemetry.testing.internal.io.netty.buffer.ByteBufAllocator;
import io.opentelemetry.testing.internal.io.netty.buffer.ByteBufUtil;
import io.opentelemetry.testing.internal.io.netty.util.concurrent.EventExecutor;
import java.io.IOException;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class PathStreamMessage
implements ByteStreamMessage {
    private static final Logger logger = LoggerFactory.getLogger(PathStreamMessage.class);
    private static final AtomicIntegerFieldUpdater<PathStreamMessage> subscribedUpdater = AtomicIntegerFieldUpdater.newUpdater(PathStreamMessage.class, "subscribed");
    private static final Set<StandardOpenOption> READ_OPERATION = ImmutableSet.of(StandardOpenOption.READ);
    private final CompletableFuture<Void> completionFuture = new EventLoopCheckingFuture<Void>();
    private final Path path;
    @Nullable
    private final ExecutorService blockingTaskExecutor;
    private final ByteBufAllocator alloc;
    private final int bufferSize;
    private long offset;
    private long length = Long.MAX_VALUE;
    private volatile int subscribed;
    @Nullable
    private volatile PathSubscription pathSubscription;

    PathStreamMessage(Path path, @Nullable ExecutorService blockingTaskExecutor, ByteBufAllocator alloc, int bufferSize) {
        this.path = Objects.requireNonNull(path, "path");
        this.blockingTaskExecutor = blockingTaskExecutor;
        this.alloc = alloc;
        this.bufferSize = bufferSize;
    }

    @Override
    public ByteStreamMessage range(long offset, long length) {
        Preconditions.checkArgument(offset >= 0L, "offset: %s (expected: >= 0)", offset);
        Preconditions.checkArgument(length > 0L, "length: %s (expected: > 0)", length);
        Preconditions.checkState(this.subscribed == 0, "cannot specify range(%s, %s) after this %s is subscribed", offset, length, PathStreamMessage.class);
        this.offset = offset;
        this.length = length;
        return this;
    }

    @Override
    public boolean isOpen() {
        return !this.completionFuture.isDone();
    }

    @Override
    public boolean isEmpty() {
        if (this.isOpen()) {
            return false;
        }
        PathSubscription pathSubscription = this.pathSubscription;
        return pathSubscription == null || pathSubscription.position == 0L;
    }

    @Override
    public long demand() {
        PathSubscription pathSubscription = this.pathSubscription;
        if (pathSubscription != null) {
            return pathSubscription.requested;
        }
        return 0L;
    }

    @Override
    public CompletableFuture<Void> whenComplete() {
        return this.completionFuture;
    }

    @Override
    public void subscribe(Subscriber<? super HttpData> subscriber, EventExecutor executor, SubscriptionOption ... options) {
        Objects.requireNonNull(subscriber, "subscriber");
        Objects.requireNonNull(executor, "executor");
        Objects.requireNonNull(options, "options");
        if (!subscribedUpdater.compareAndSet(this, 0, 1)) {
            subscriber.onSubscribe(NoopSubscription.get());
            subscriber.onError(new IllegalStateException("Only single subscriber is allowed!"));
            return;
        }
        if (executor.inEventLoop()) {
            this.subscribe0(subscriber, executor, options);
        } else {
            executor.execute(() -> this.subscribe0(subscriber, executor, options));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subscribe0(Subscriber<? super HttpData> subscriber, EventExecutor executor, SubscriptionOption ... options) {
        PathSubscription pathSubscription;
        ServiceRequestContext serviceRequestContext;
        ExecutorService blockingTaskExecutor = this.blockingTaskExecutor != null ? this.blockingTaskExecutor : ((serviceRequestContext = ServiceRequestContext.currentOrNull()) != null ? serviceRequestContext.blockingTaskExecutor() : null);
        AsynchronousFileChannel fileChannel = null;
        boolean success = false;
        try {
            fileChannel = AsynchronousFileChannel.open(this.path, READ_OPERATION, blockingTaskExecutor, new FileAttribute[0]);
            if (fileChannel.size() == 0L) {
                subscriber.onSubscribe(NoopSubscription.get());
                if (this.completionFuture.isCompletedExceptionally()) {
                    this.completionFuture.handle((unused, cause) -> {
                        subscriber.onError(Exceptions.peel(cause));
                        return null;
                    });
                } else {
                    subscriber.onComplete();
                    this.completionFuture.complete(null);
                }
                return;
            }
            success = true;
        }
        catch (IOException e) {
            subscriber.onSubscribe(NoopSubscription.get());
            subscriber.onError(e);
            this.completionFuture.completeExceptionally(e);
            return;
        }
        finally {
            if (!success && fileChannel != null) {
                try {
                    fileChannel.close();
                }
                catch (IOException e) {
                    logger.warn("Unexpected exception while closing {}.", (Object)fileChannel, (Object)e);
                }
            }
        }
        int bufferSize = Math.min(Ints.saturatedCast(this.length), this.bufferSize);
        this.pathSubscription = pathSubscription = new PathSubscription(fileChannel, subscriber, executor, this.offset, this.length, bufferSize, InternalStreamMessageUtil.containsNotifyCancellation(options), InternalStreamMessageUtil.containsWithPooledObjects(options));
        subscriber.onSubscribe(pathSubscription);
    }

    @Override
    public void abort() {
        this.abort(AbortedStreamException.get());
    }

    @Override
    public void abort(Throwable cause) {
        Objects.requireNonNull(cause, "cause");
        PathSubscription pathSubscription = this.pathSubscription;
        if (pathSubscription != null) {
            pathSubscription.maybeCloseFileChannel();
            pathSubscription.close(cause);
        }
        this.completionFuture.completeExceptionally(cause);
    }

    private final class PathSubscription
    implements CompletionHandler<Integer, ByteBuf>,
    Subscription {
        private final AsynchronousFileChannel fileChannel;
        private Subscriber<? super HttpData> downstream;
        private final EventExecutor executor;
        private final int bufferSize;
        private final long end;
        private final boolean notifyCancellation;
        private final boolean withPooledObjects;
        private boolean reading;
        private boolean closed;
        private volatile long requested;
        private volatile long position;

        private PathSubscription(AsynchronousFileChannel fileChannel, Subscriber<? super HttpData> downstream, EventExecutor executor, long offset, long length, int bufferSize, boolean notifyCancellation, boolean withPooledObjects) {
            this.fileChannel = fileChannel;
            this.downstream = downstream;
            this.executor = executor;
            this.bufferSize = bufferSize;
            this.end = LongMath.saturatedAdd(offset, length);
            this.notifyCancellation = notifyCancellation;
            this.withPooledObjects = withPooledObjects;
            this.position = offset;
        }

        @Override
        public void request(long n) {
            if (n <= 0L) {
                this.downstream.onError(new IllegalArgumentException("Rule \u00a73.9 violated: non-positive subscription requests are forbidden."));
                this.cancel();
            } else {
                this.request0(n);
            }
        }

        private void request0(long n) {
            long requested = this.requested;
            if (requested == Long.MAX_VALUE) {
                return;
            }
            this.requested = n == Long.MAX_VALUE ? Long.MAX_VALUE : LongMath.saturatedAdd(requested, n);
            if (requested > 0L) {
                return;
            }
            this.read();
        }

        private void read() {
            if (!this.reading && !this.closed && this.requested > 0L) {
                --this.requested;
                this.reading = true;
                long position = this.position;
                int bufferSize = Math.min(this.bufferSize, Ints.saturatedCast(this.end - position));
                ByteBuf buffer = PathStreamMessage.this.alloc.buffer(bufferSize);
                this.fileChannel.read(buffer.nioBuffer(0, bufferSize), position, buffer, this);
            }
        }

        @Override
        public void cancel() {
            if (this.executor.inEventLoop()) {
                this.cancel0();
            } else {
                this.executor.execute(this::cancel0);
            }
        }

        private void cancel0() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (!this.reading) {
                this.maybeCloseFileChannel();
            }
            CancelledSubscriptionException cause = CancelledSubscriptionException.get();
            if (this.notifyCancellation) {
                this.downstream.onError(cause);
            }
            PathStreamMessage.this.completionFuture.completeExceptionally(cause);
            this.downstream = NoopSubscriber.get();
        }

        @Override
        public void completed(Integer result, ByteBuf byteBuf) {
            this.executor.execute(() -> {
                if (this.closed) {
                    byteBuf.release();
                    this.maybeCloseFileChannel();
                } else if (result >= 0) {
                    HttpData data;
                    this.position += (long)result.intValue();
                    if (this.withPooledObjects) {
                        byteBuf.writerIndex(result);
                        data = HttpData.wrap(byteBuf);
                    } else {
                        data = HttpData.wrap(ByteBufUtil.getBytes(byteBuf, 0, result));
                        byteBuf.release();
                    }
                    this.downstream.onNext(data);
                    long position = this.position;
                    assert (position <= this.end);
                    if (position < this.end) {
                        this.reading = false;
                        this.read();
                    } else {
                        this.maybeCloseFileChannel();
                        this.close0(null);
                    }
                } else {
                    byteBuf.release();
                    this.maybeCloseFileChannel();
                    this.close0(null);
                }
            });
        }

        @Override
        public void failed(Throwable ex, ByteBuf byteBuf) {
            this.executor.execute(() -> {
                byteBuf.release();
                this.maybeCloseFileChannel();
                this.close0(ex);
            });
        }

        private void maybeCloseFileChannel() {
            if (this.fileChannel.isOpen()) {
                try {
                    this.fileChannel.close();
                }
                catch (IOException cause) {
                    logger.warn("Unexpected exception while closing {}.", (Object)this.fileChannel, (Object)cause);
                }
            }
        }

        private void close(@Nullable Throwable cause) {
            if (this.executor.inEventLoop()) {
                this.close0(cause);
            } else {
                this.executor.execute(() -> this.close0(cause));
            }
        }

        private void close0(@Nullable Throwable cause) {
            if (!this.closed) {
                this.closed = true;
                if (cause == null) {
                    this.downstream.onComplete();
                    PathStreamMessage.this.completionFuture.complete(null);
                } else {
                    this.downstream.onError(cause);
                    PathStreamMessage.this.completionFuture.completeExceptionally(cause);
                }
            }
        }
    }
}

