/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.impl.transaction.log;

import java.io.File;
import java.io.Flushable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.neo4j.adversaries.Adversary;
import org.neo4j.adversaries.ClassGuardedAdversary;
import org.neo4j.adversaries.CountingAdversary;
import org.neo4j.adversaries.fs.AdversarialFileSystemAbstraction;
import org.neo4j.io.fs.DelegatingStoreChannel;
import org.neo4j.io.fs.EphemeralFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.FileSystemLifecycleAdapter;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.layout.DatabaseLayout;
import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracer;
import org.neo4j.kernel.impl.api.TestCommand;
import org.neo4j.kernel.impl.api.TestCommandReaderFactory;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.transaction.SimpleLogVersionRepository;
import org.neo4j.kernel.impl.transaction.SimpleTransactionIdStore;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.BatchingTransactionAppender;
import org.neo4j.kernel.impl.transaction.log.InMemoryClosableChannel;
import org.neo4j.kernel.impl.transaction.log.LogEntryCursor;
import org.neo4j.kernel.impl.transaction.log.LogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChecksumChannel;
import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.kernel.impl.transaction.log.files.LogFile;
import org.neo4j.kernel.impl.transaction.log.files.LogFiles;
import org.neo4j.kernel.impl.transaction.log.files.LogFilesBuilder;
import org.neo4j.kernel.impl.transaction.log.files.TransactionLogFiles;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent;
import org.neo4j.kernel.impl.transaction.tracing.LogForceEvents;
import org.neo4j.kernel.impl.transaction.tracing.LogForceWaitEvent;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.monitoring.DatabasePanicEventGenerator;
import org.neo4j.logging.Log;
import org.neo4j.logging.NullLog;
import org.neo4j.memory.EmptyMemoryTracker;
import org.neo4j.memory.MemoryTracker;
import org.neo4j.monitoring.DatabaseHealth;
import org.neo4j.monitoring.Health;
import org.neo4j.monitoring.PanicEventGenerator;
import org.neo4j.storageengine.api.CommandReaderFactory;
import org.neo4j.storageengine.api.LogVersionRepository;
import org.neo4j.storageengine.api.StoreId;
import org.neo4j.storageengine.api.TransactionIdStore;
import org.neo4j.test.DoubleLatch;
import org.neo4j.test.Race;
import org.neo4j.test.ThreadTestUtils;
import org.neo4j.test.extension.EphemeralNeo4jLayoutExtension;
import org.neo4j.test.extension.Inject;
import org.neo4j.test.extension.LifeExtension;

@EphemeralNeo4jLayoutExtension
@ExtendWith(value={LifeExtension.class})
public class BatchingTransactionAppenderConcurrencyTest {
    private static final long MILLISECONDS_TO_WAIT = TimeUnit.MINUTES.toMillis(1L);
    private static final Predicate<StackTraceElement[]> IN_CORRECT_FORCE_AFTER_APPEND_METHOD = stackTrace -> Stream.of(stackTrace).anyMatch(e -> e.getMethodName().equals("forceAfterAppend"));
    private static ExecutorService executor;
    @Inject
    private LifeSupport life;
    @Inject
    private DatabaseLayout databaseLayout;
    private final LogAppendEvent logAppendEvent = LogAppendEvent.NULL;
    private final LogFiles logFiles = (LogFiles)Mockito.mock(TransactionLogFiles.class);
    private final LogFile logFile = (LogFile)Mockito.mock(LogFile.class);
    private final LogRotation logRotation = LogRotation.NO_ROTATION;
    private final TransactionMetadataCache transactionMetadataCache = new TransactionMetadataCache();
    private final TransactionIdStore transactionIdStore = new SimpleTransactionIdStore();
    private final SimpleLogVersionRepository logVersionRepository = new SimpleLogVersionRepository();
    private final Health databaseHealth = (Health)Mockito.mock(DatabaseHealth.class);
    private final Semaphore forceSemaphore = new Semaphore(0);
    private final BlockingQueue<ChannelCommand> channelCommandQueue = new LinkedBlockingQueue<ChannelCommand>(2);

    @BeforeAll
    static void setUpExecutor() {
        executor = Executors.newCachedThreadPool();
    }

    @AfterAll
    static void tearDownExecutor() {
        executor.shutdown();
        executor = null;
    }

    @BeforeEach
    void setUp() {
        Mockito.when((Object)this.logFiles.getLogFile()).thenReturn((Object)this.logFile);
        Mockito.when((Object)this.logFile.getWriter()).thenReturn((Object)new CommandQueueChannel());
    }

    @Test
    void shouldForceLogChannel() throws Throwable {
        BatchingTransactionAppender appender = (BatchingTransactionAppender)this.life.add((Lifecycle)this.createTransactionAppender());
        this.life.start();
        appender.forceAfterAppend((LogForceEvents)this.logAppendEvent);
        Assertions.assertThat((Comparable)((Object)this.channelCommandQueue.take())).isEqualTo((Object)ChannelCommand.emptyBufferIntoChannelAndClearIt);
        Assertions.assertThat((Comparable)((Object)this.channelCommandQueue.take())).isEqualTo((Object)ChannelCommand.force);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.channelCommandQueue.isEmpty());
    }

    @Test
    void shouldWaitForOngoingForceToCompleteBeforeForcingAgain() throws Throwable {
        this.channelCommandQueue.put(ChannelCommand.dummy);
        BatchingTransactionAppender appender = (BatchingTransactionAppender)this.life.add((Lifecycle)this.createTransactionAppender());
        this.life.start();
        Runnable runnable = this.createForceAfterAppendRunnable(appender);
        Future<?> future = executor.submit(runnable);
        this.forceSemaphore.acquire();
        Thread otherThread = ThreadTestUtils.fork((Runnable)runnable);
        ThreadTestUtils.awaitThreadState((Thread)otherThread, (long)MILLISECONDS_TO_WAIT, (Thread.State)Thread.State.TIMED_WAITING, (Thread.State[])new Thread.State[0]);
        Assertions.assertThat((Comparable)((Object)this.channelCommandQueue.take())).isEqualTo((Object)ChannelCommand.dummy);
        Assertions.assertThat((Comparable)((Object)this.channelCommandQueue.take())).isEqualTo((Object)ChannelCommand.emptyBufferIntoChannelAndClearIt);
        Assertions.assertThat((Comparable)((Object)this.channelCommandQueue.take())).isEqualTo((Object)ChannelCommand.force);
        Assertions.assertThat((Comparable)((Object)this.channelCommandQueue.take())).isEqualTo((Object)ChannelCommand.emptyBufferIntoChannelAndClearIt);
        Assertions.assertThat((Comparable)((Object)this.channelCommandQueue.take())).isEqualTo((Object)ChannelCommand.force);
        future.get();
        otherThread.join();
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.channelCommandQueue.isEmpty());
    }

    @Test
    void shouldBatchUpMultipleWaitingForceRequests() throws Throwable {
        this.channelCommandQueue.put(ChannelCommand.dummy);
        BatchingTransactionAppender appender = (BatchingTransactionAppender)this.life.add((Lifecycle)this.createTransactionAppender());
        this.life.start();
        Runnable runnable = this.createForceAfterAppendRunnable(appender);
        Future<?> future = executor.submit(runnable);
        this.forceSemaphore.acquire();
        Thread[] otherThreads = new Thread[10];
        for (int i = 0; i < otherThreads.length; ++i) {
            otherThreads[i] = ThreadTestUtils.fork((Runnable)runnable);
        }
        for (Thread otherThread : otherThreads) {
            ThreadTestUtils.awaitThreadState((Thread)otherThread, (long)MILLISECONDS_TO_WAIT, IN_CORRECT_FORCE_AFTER_APPEND_METHOD, (Thread.State)Thread.State.TIMED_WAITING, (Thread.State[])new Thread.State[0]);
        }
        Assertions.assertThat((Comparable)((Object)this.channelCommandQueue.take())).isEqualTo((Object)ChannelCommand.dummy);
        Assertions.assertThat((Comparable)((Object)this.channelCommandQueue.take())).isEqualTo((Object)ChannelCommand.emptyBufferIntoChannelAndClearIt);
        Assertions.assertThat((Comparable)((Object)this.channelCommandQueue.take())).isEqualTo((Object)ChannelCommand.force);
        Assertions.assertThat((Comparable)((Object)this.channelCommandQueue.take())).isEqualTo((Object)ChannelCommand.emptyBufferIntoChannelAndClearIt);
        Assertions.assertThat((Comparable)((Object)this.channelCommandQueue.take())).isEqualTo((Object)ChannelCommand.force);
        future.get();
        for (Thread otherThread : otherThreads) {
            otherThread.join();
        }
        org.junit.jupiter.api.Assertions.assertTrue((boolean)this.channelCommandQueue.isEmpty(), (String)("Command queue: " + this.channelCommandQueue));
    }

    @Test
    void shouldHaveAllConcurrentAppendersSeePanic() throws Throwable {
        ClassGuardedAdversary adversary = new ClassGuardedAdversary((Adversary)new CountingAdversary(1, true), BatchingTransactionAppenderConcurrencyTest.failMethod(BatchingTransactionAppender.class, "force"));
        EphemeralFileSystemAbstraction efs = new EphemeralFileSystemAbstraction();
        AdversarialFileSystemAbstraction fs = new AdversarialFileSystemAbstraction((Adversary)adversary, (FileSystemAbstraction)efs);
        this.life.add((Lifecycle)new FileSystemLifecycleAdapter((FileSystemAbstraction)fs));
        DatabaseHealth databaseHealth = new DatabaseHealth((PanicEventGenerator)Mockito.mock(DatabasePanicEventGenerator.class), (Log)NullLog.getInstance());
        LogFiles logFiles = LogFilesBuilder.builder((DatabaseLayout)this.databaseLayout, (FileSystemAbstraction)fs).withLogVersionRepository((LogVersionRepository)this.logVersionRepository).withTransactionIdStore(this.transactionIdStore).withLogEntryReader((LogEntryReader)new VersionAwareLogEntryReader((CommandReaderFactory)new TestCommandReaderFactory())).withStoreId(StoreId.UNKNOWN).build();
        this.life.add((Lifecycle)logFiles);
        BatchingTransactionAppender appender = (BatchingTransactionAppender)this.life.add((Lifecycle)new BatchingTransactionAppender(logFiles, this.logRotation, this.transactionMetadataCache, this.transactionIdStore, (Health)databaseHealth));
        this.life.start();
        int numberOfAppenders = 10;
        final CountDownLatch trap = new CountDownLatch(numberOfAppenders);
        LogAppendEvent.Empty beforeForceTrappingEvent = new LogAppendEvent.Empty(){

            public LogForceWaitEvent beginLogForceWait() {
                trap.countDown();
                DoubleLatch.awaitLatch((CountDownLatch)trap);
                return super.beginLogForceWait();
            }
        };
        Race race = new Race();
        for (int i = 0; i < numberOfAppenders; ++i) {
            race.addContestant(() -> this.lambda$shouldHaveAllConcurrentAppendersSeePanic$2(appender, (LogAppendEvent)beforeForceTrappingEvent));
        }
        race.go();
    }

    @Test
    void databasePanicShouldHandleOutOfMemoryErrors() throws IOException, InterruptedException {
        CountDownLatch panicLatch = new CountDownLatch(1);
        final CountDownLatch adversaryLatch = new CountDownLatch(1);
        OutOfMemoryAwareFileSystem fs = new OutOfMemoryAwareFileSystem();
        this.life.add((Lifecycle)new FileSystemLifecycleAdapter((FileSystemAbstraction)fs));
        SlowPanickingDatabaseHealth slowPanicDatabaseHealth = new SlowPanickingDatabaseHealth(panicLatch, adversaryLatch);
        LogFiles logFiles = LogFilesBuilder.builder((DatabaseLayout)this.databaseLayout, (FileSystemAbstraction)fs).withLogVersionRepository((LogVersionRepository)this.logVersionRepository).withTransactionIdStore(this.transactionIdStore).withLogEntryReader((LogEntryReader)new VersionAwareLogEntryReader((CommandReaderFactory)new TestCommandReaderFactory())).withStoreId(StoreId.UNKNOWN).build();
        this.life.add((Lifecycle)logFiles);
        BatchingTransactionAppender appender = (BatchingTransactionAppender)this.life.add((Lifecycle)new BatchingTransactionAppender(logFiles, this.logRotation, this.transactionMetadataCache, this.transactionIdStore, (Health)slowPanicDatabaseHealth));
        this.life.start();
        appender.append(this.tx(), LogAppendEvent.NULL);
        fs.shouldOOM = true;
        Future<Long> failingTransaction = executor.submit(() -> appender.append(this.tx(), LogAppendEvent.NULL));
        panicLatch.await();
        fs.shouldOOM = false;
        try {
            appender.append(this.tx(), (LogAppendEvent)new LogAppendEvent.Empty(){

                public LogForceWaitEvent beginLogForceWait() {
                    adversaryLatch.countDown();
                    return super.beginLogForceWait();
                }
            });
            org.junit.jupiter.api.Assertions.fail((String)"Should have failed since database should have panicked");
        }
        catch (IOException e) {
            org.junit.jupiter.api.Assertions.assertTrue((boolean)e.getMessage().contains("The database has encountered a critical error"));
        }
        try {
            failingTransaction.get();
            org.junit.jupiter.api.Assertions.fail((String)"Should have failed with OutOfMemoryError error");
        }
        catch (ExecutionException e) {
            org.junit.jupiter.api.Assertions.assertTrue((boolean)(e.getCause() instanceof OutOfMemoryError));
        }
        VersionAwareLogEntryReader logEntryReader = new VersionAwareLogEntryReader((CommandReaderFactory)new TestCommandReaderFactory());
        Assertions.assertThat((long)logFiles.getLowestLogVersion()).isEqualTo(logFiles.getHighestLogVersion());
        long version = logFiles.getHighestLogVersion();
        try (PhysicalLogVersionedStoreChannel channel = logFiles.openForVersion(version);
             ReadAheadLogChannel readAheadLogChannel = new ReadAheadLogChannel((LogVersionedStoreChannel)channel, (MemoryTracker)EmptyMemoryTracker.INSTANCE);
             LogEntryCursor cursor = new LogEntryCursor((LogEntryReader)logEntryReader, (ReadableClosablePositionAwareChecksumChannel)readAheadLogChannel);){
            long numberOfTransactions = 0L;
            while (cursor.next()) {
                LogEntry entry = cursor.get();
                if (!(entry instanceof LogEntryCommit)) continue;
                ++numberOfTransactions;
            }
            Assertions.assertThat((long)numberOfTransactions).isEqualTo(1L);
        }
    }

    protected TransactionToApply tx() {
        PhysicalTransactionRepresentation tx = new PhysicalTransactionRepresentation(Collections.singletonList(new TestCommand()));
        tx.setHeader(new byte[0], 0L, 0L, 0L, 0);
        return new TransactionToApply((TransactionRepresentation)tx, PageCursorTracer.NULL);
    }

    private Runnable createForceAfterAppendRunnable(BatchingTransactionAppender appender) {
        return () -> {
            try {
                appender.forceAfterAppend((LogForceEvents)this.logAppendEvent);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        };
    }

    private static Predicate<StackWalker.StackFrame> failMethod(Class<?> klass, String methodName) {
        return frame -> frame.getClassName().equals(klass.getName()) && frame.getMethodName().equals(methodName);
    }

    private BatchingTransactionAppender createTransactionAppender() {
        return new BatchingTransactionAppender(this.logFiles, this.logRotation, this.transactionMetadataCache, this.transactionIdStore, this.databaseHealth);
    }

    private /* synthetic */ void lambda$shouldHaveAllConcurrentAppendersSeePanic$2(BatchingTransactionAppender appender, LogAppendEvent beforeForceTrappingEvent) {
        try {
            appender.append(this.tx(), beforeForceTrappingEvent);
            org.junit.jupiter.api.Assertions.fail((String)"No transaction should be considered appended");
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    class CommandQueueChannel
    extends InMemoryClosableChannel
    implements Flushable {
        CommandQueueChannel() {
        }

        public Flushable prepareForFlush() {
            try {
                BatchingTransactionAppenderConcurrencyTest.this.channelCommandQueue.put(ChannelCommand.emptyBufferIntoChannelAndClearIt);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return this;
        }

        @Override
        public void flush() throws IOException {
            try {
                BatchingTransactionAppenderConcurrencyTest.this.forceSemaphore.release();
                BatchingTransactionAppenderConcurrencyTest.this.channelCommandQueue.put(ChannelCommand.force);
            }
            catch (InterruptedException e) {
                throw new IOException(e);
            }
        }
    }

    private static enum ChannelCommand {
        emptyBufferIntoChannelAndClearIt,
        force,
        dummy;

    }

    private static class SlowPanickingDatabaseHealth
    extends DatabaseHealth {
        private final CountDownLatch panicLatch;
        private final CountDownLatch adversaryLatch;

        SlowPanickingDatabaseHealth(CountDownLatch panicLatch, CountDownLatch adversaryLatch) {
            super((PanicEventGenerator)Mockito.mock(DatabasePanicEventGenerator.class), (Log)NullLog.getInstance());
            this.panicLatch = panicLatch;
            this.adversaryLatch = adversaryLatch;
        }

        public void panic(Throwable cause) {
            this.panicLatch.countDown();
            try {
                this.adversaryLatch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            super.panic(cause);
        }
    }

    private static class OutOfMemoryAwareFileSystem
    extends EphemeralFileSystemAbstraction {
        private volatile boolean shouldOOM;

        private OutOfMemoryAwareFileSystem() {
        }

        public synchronized StoreChannel write(File fileName) throws IOException {
            return new DelegatingStoreChannel(super.write(fileName)){

                public void writeAll(ByteBuffer src) throws IOException {
                    if (shouldOOM) {
                        throw new OutOfMemoryError("Temporary buffer allocation failed");
                    }
                    super.writeAll(src);
                }
            };
        }
    }
}

