package io.github.kavahub.file.reader;

import io.github.kavahub.file.ChannelHelper;
import io.github.kavahub.file.query.Query;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/kavahub/file/reader/FileByteReaderQuery.class */
public class FileByteReaderQuery extends Query<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(FileByteReaderQuery.class);
    private final Path file;
    private final int bufferSize;

    public FileByteReaderQuery(Path path, int i) {
        this.file = path;
        this.bufferSize = i;
    }

    @Override // io.github.kavahub.file.query.Query
    public CompletableFuture<Void> subscribe(Consumer<? super byte[]> consumer, Consumer<? super Throwable> consumer2) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
            if (log.isDebugEnabled()) {
                log.debug("Begin to read file: {}", this.file.toString());
            }
            AsynchronousFileChannel open = AsynchronousFileChannel.open(this.file, StandardOpenOption.READ);
            ReadFile whenRead = ReadFile.of(open, this.bufferSize).whenReadError(th -> {
                if (log.isErrorEnabled()) {
                    log.error("Failed while file reading", th);
                }
                completableFuture.complete(null);
                consumer2.accept(th);
            }).whenHandleDataError(th2 -> {
                completableFuture.complete(null);
                consumer2.accept(th2);
            }).whenFinish(num -> {
                if (log.isDebugEnabled()) {
                    log.debug("File read complete", num);
                }
                if (completableFuture.isDone()) {
                    return;
                }
                completableFuture.complete(null);
            }).whenCancel(num2 -> {
                if (log.isDebugEnabled()) {
                    log.debug("Cancel file reading");
                }
            }).whenRead((bArr, num3) -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{} bytes] has been readed", num3);
                }
                consumer.accept(bArr);
            });
            whenRead.read();
            completableFuture.whenComplete((r5, th3) -> {
                if (r5 == null || th3 != null) {
                    whenRead.cancel();
                    if (log.isDebugEnabled()) {
                        log.debug("The signal to cancel file reading has been sent");
                    }
                    ChannelHelper.close(open);
                }
            });
        } catch (Exception e) {
            completableFuture.complete(null);
            consumer2.accept(e);
        }
        return completableFuture;
    }
}
