/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ozone.om.ratis;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.S3SecretManager;
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBufferMetrics;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.function.CheckedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class OzoneManagerDoubleBuffer {
    private static final Logger LOG = LoggerFactory.getLogger(OzoneManagerDoubleBuffer.class);
    private Queue<Entry> currentBuffer;
    private Queue<Entry> readyBuffer;
    private final Semaphore unFlushedTransactions;
    private final Daemon daemon;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final FlushNotifier flushNotifier;
    private final OMMetadataManager omMetadataManager;
    private final Consumer<TermIndex> updateLastAppliedIndex;
    private final S3SecretManager s3SecretManager;
    private final boolean isTracingEnabled;
    private final OzoneManagerDoubleBufferMetrics metrics = OzoneManagerDoubleBufferMetrics.create();
    private final AtomicLong flushedTransactionCount = new AtomicLong();
    private final AtomicLong flushIterations = new AtomicLong();

    public static Builder newBuilder() {
        return new Builder();
    }

    static Semaphore newSemaphore(int permits) {
        return permits > 0 ? new Semaphore(permits) : null;
    }

    private OzoneManagerDoubleBuffer(Builder b) {
        this.currentBuffer = new ConcurrentLinkedQueue<Entry>();
        this.readyBuffer = new ConcurrentLinkedQueue<Entry>();
        this.omMetadataManager = b.omMetadataManager;
        this.s3SecretManager = b.s3SecretManager;
        this.updateLastAppliedIndex = b.updateLastAppliedIndex;
        this.flushNotifier = b.flushNotifier;
        this.unFlushedTransactions = OzoneManagerDoubleBuffer.newSemaphore(b.maxUnFlushedTransactionCount);
        this.isTracingEnabled = b.isTracingEnabled;
        this.daemon = new Daemon(this::flushTransactions);
        this.daemon.setName(String.valueOf(b.threadPrefix) + "OMDoubleBufferFlushThread");
    }

    public OzoneManagerDoubleBuffer start() {
        this.isRunning.set(true);
        this.daemon.start();
        return this;
    }

    public void acquireUnFlushedTransactions(int n) throws InterruptedException {
        this.unFlushedTransactions.acquire(n);
    }

    void releaseUnFlushedTransactions(int n) {
        this.unFlushedTransactions.release(n);
    }

    private void addToBatchWithTrace(OzoneManagerProtocolProtos.OMResponse omResponse, CheckedRunnable<IOException> runnable) throws IOException {
        if (!this.isTracingEnabled) {
            runnable.run();
            return;
        }
        String spanName = "DB-addToWriteBatch-" + omResponse.getCmdType();
        TracingUtil.executeAsChildSpan((String)spanName, (String)omResponse.getTraceID(), runnable);
    }

    private void flushBatchWithTrace(String parentName, int batchSize, CheckedRunnable<IOException> runnable) throws IOException {
        if (!this.isTracingEnabled) {
            runnable.run();
            return;
        }
        String spanName = "DB-commitWriteBatch-Size-" + batchSize;
        TracingUtil.executeAsChildSpan((String)spanName, (String)parentName, runnable);
    }

    private void addToBatchTransactionInfoWithTrace(String parentName, long transactionIndex, CheckedRunnable<IOException> runnable) throws IOException {
        if (!this.isTracingEnabled) {
            runnable.run();
            return;
        }
        String spanName = "DB-addWriteBatch-transactioninfo-" + transactionIndex;
        TracingUtil.executeAsChildSpan((String)spanName, (String)parentName, runnable);
    }

    @VisibleForTesting
    public void flushTransactions() {
        while (this.isRunning.get() && this.canFlush()) {
            this.flushCurrentBuffer();
        }
    }

    @VisibleForTesting
    void flushCurrentBuffer() {
        try {
            this.swapCurrentAndReadyBuffer();
            List<Queue<Entry>> bufferQueues = this.splitReadyBufferAtCreateSnapshot();
            for (Queue<Entry> buffer : bufferQueues) {
                this.flushBatch(buffer);
            }
            this.clearReadyBuffer();
            this.flushNotifier.notifyFlush();
        }
        catch (IOException ex) {
            this.terminate(ex, 1);
        }
        catch (Throwable t) {
            this.terminate(t, 2);
        }
    }

    private void flushBatch(Queue<Entry> buffer) throws IOException {
        HashMap<String, List<Long>> cleanupEpochs = new HashMap<String, List<Long>>();
        List flushedTransactions = buffer.stream().map(Entry::getTermIndex).sorted().collect(Collectors.toList());
        int flushedTransactionsSize = flushedTransactions.size();
        TermIndex lastTransaction = (TermIndex)flushedTransactions.get(flushedTransactionsSize - 1);
        Throwable throwable = null;
        Object var7_9 = null;
        try (BatchOperation batchOperation = this.omMetadataManager.getStore().initBatchOperation();){
            String lastTraceId = this.addToBatch(buffer, batchOperation);
            buffer.iterator().forEachRemaining(entry -> this.addCleanupEntry((Entry)entry, (Map<String, List<Long>>)cleanupEpochs));
            this.addToBatchTransactionInfoWithTrace(lastTraceId, lastTransaction.getIndex(), (CheckedRunnable<IOException>)((CheckedRunnable)() -> this.omMetadataManager.getTransactionInfoTable().putWithBatch(batchOperation, (Object)"#TRANSACTIONINFO", (Object)TransactionInfo.valueOf((TermIndex)lastTransaction))));
            long startTime = Time.monotonicNow();
            this.flushBatchWithTrace(lastTraceId, buffer.size(), (CheckedRunnable<IOException>)((CheckedRunnable)() -> this.omMetadataManager.getStore().commitBatchOperation(batchOperation)));
            this.metrics.updateFlushTime(Time.monotonicNow() - startTime);
        }
        catch (Throwable throwable2) {
            if (throwable == null) {
                throwable = throwable2;
            } else if (throwable != throwable2) {
                throwable.addSuppressed(throwable2);
            }
            throw throwable;
        }
        long accumulativeCount = this.flushedTransactionCount.addAndGet(flushedTransactionsSize);
        long flushedIterations = this.flushIterations.incrementAndGet();
        LOG.debug("Sync iteration: {}, size in this iteration: {}, accumulative count: {}", new Object[]{flushedIterations, flushedTransactionsSize, accumulativeCount});
        this.cleanupCache(cleanupEpochs);
        this.releaseUnFlushedTransactions(flushedTransactionsSize);
        this.updateLastAppliedIndex.accept(lastTransaction);
        this.metrics.updateFlush(flushedTransactionsSize);
    }

    private String addToBatch(Queue<Entry> buffer, BatchOperation batchOperation) {
        String lastTraceId = null;
        for (Entry entry : buffer) {
            OMClientResponse response = entry.getResponse();
            OzoneManagerProtocolProtos.OMResponse omResponse = response.getOMResponse();
            lastTraceId = omResponse.getTraceID();
            try {
                this.addToBatchWithTrace(omResponse, (CheckedRunnable<IOException>)((CheckedRunnable)() -> response.checkAndUpdateDB(this.omMetadataManager, batchOperation)));
            }
            catch (IOException ex) {
                this.terminate(ex, 1, omResponse);
            }
            catch (Throwable t) {
                this.terminate(t, 2, omResponse);
            }
        }
        return lastTraceId;
    }

    private synchronized List<Queue<Entry>> splitReadyBufferAtCreateSnapshot() {
        ArrayList<Queue<Entry>> response = new ArrayList<Queue<Entry>>();
        OzoneManagerProtocolProtos.OMResponse previousOmResponse = null;
        for (Entry entry : this.readyBuffer) {
            OzoneManagerProtocolProtos.OMResponse omResponse = entry.getResponse().getOMResponse();
            if (response.isEmpty() || OzoneManagerDoubleBuffer.isStandaloneBatchCmdTypes(omResponse) || OzoneManagerDoubleBuffer.isStandaloneBatchCmdTypes(previousOmResponse)) {
                response.add(new LinkedList());
            }
            ((Queue)response.get(response.size() - 1)).add(entry);
            previousOmResponse = omResponse;
        }
        return response;
    }

    private static boolean isStandaloneBatchCmdTypes(OzoneManagerProtocolProtos.OMResponse response) {
        if (response == null) {
            return false;
        }
        OzoneManagerProtocolProtos.Type type = response.getCmdType();
        return type == OzoneManagerProtocolProtos.Type.SnapshotPurge || type == OzoneManagerProtocolProtos.Type.CreateSnapshot;
    }

    private void addCleanupEntry(Entry entry, Map<String, List<Long>> cleanupEpochs) {
        Class<?> responseClass = entry.getResponse().getClass();
        CleanupTableInfo cleanupTableInfo = responseClass.getAnnotation(CleanupTableInfo.class);
        if (cleanupTableInfo != null) {
            List<String> cleanupTables = cleanupTableInfo.cleanupAll() ? OMDBDefinition.get().getColumnFamilyNames() : Arrays.asList(cleanupTableInfo.cleanupTables());
            for (String table : cleanupTables) {
                cleanupEpochs.computeIfAbsent(table, list -> new ArrayList()).add(entry.getTermIndex().getIndex());
            }
        } else {
            throw new RuntimeException("CleanupTableInfo Annotation is missing for" + responseClass);
        }
    }

    private void cleanupCache(Map<String, List<Long>> cleanupEpochs) {
        cleanupEpochs.forEach((tableName, epochs) -> {
            Collections.sort(epochs);
            this.omMetadataManager.getTable(tableName).cleanupCache(epochs);
            if (tableName.equals(OMDBDefinition.S3_SECRET_TABLE_DEF.getName())) {
                this.s3SecretManager.clearS3Cache(epochs);
            }
        });
    }

    private synchronized void clearReadyBuffer() {
        this.readyBuffer.clear();
    }

    public void stop() {
        this.stopDaemon();
        this.metrics.unRegister();
    }

    @VisibleForTesting
    public void stopDaemon() {
        if (this.isRunning.compareAndSet(true, false)) {
            LOG.info("Stopping OMDoubleBuffer flush thread");
            this.daemon.interrupt();
            try {
                this.daemon.join();
            }
            catch (InterruptedException e) {
                LOG.debug("Interrupted while waiting for daemon to exit.", (Throwable)e);
            }
        } else {
            LOG.info("OMDoubleBuffer flush thread is not running.");
        }
    }

    private void terminate(Throwable t, int status) {
        this.terminate(t, status, null);
    }

    private void terminate(Throwable t, int status, OzoneManagerProtocolProtos.OMResponse omResponse) {
        StringBuilder message = new StringBuilder("During flush to DB encountered error in OMDoubleBuffer flush thread ").append(Thread.currentThread().getName());
        if (omResponse != null) {
            message.append(" when handling OMRequest: ").append(omResponse);
        }
        ExitUtils.terminate((int)status, (String)message.toString(), (Throwable)t, (Logger)LOG);
    }

    public synchronized void add(OMClientResponse response, TermIndex termIndex) {
        this.currentBuffer.add(new Entry(termIndex, response));
        this.notify();
    }

    private synchronized boolean canFlush() {
        try {
            while (this.currentBuffer.isEmpty()) {
                this.flushNotifier.notifyFlush();
                this.flushNotifier.notifyFlush();
                this.wait(1000L);
            }
            return true;
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            if (this.isRunning.get()) {
                String message = "OMDoubleBuffer flush thread " + Thread.currentThread().getName() + " encountered Interrupted " + "exception while running";
                ExitUtils.terminate((int)1, (String)message, (Throwable)ex, (Logger)LOG);
            }
            LOG.info("OMDoubleBuffer flush thread {} is interrupted and will exit.", (Object)Thread.currentThread().getName());
            return false;
        }
    }

    private synchronized void swapCurrentAndReadyBuffer() {
        Queue<Entry> temp = this.currentBuffer;
        this.currentBuffer = this.readyBuffer;
        this.readyBuffer = temp;
    }

    OzoneManagerDoubleBufferMetrics getMetrics() {
        return this.metrics;
    }

    long getFlushedTransactionCountForTesting() {
        return this.flushedTransactionCount.get();
    }

    long getFlushIterationsForTesting() {
        return this.flushIterations.get();
    }

    int getCurrentBufferSize() {
        return this.currentBuffer.size();
    }

    synchronized int getReadyBufferSize() {
        return this.readyBuffer.size();
    }

    @VisibleForTesting
    public void resume() {
        this.isRunning.set(true);
    }

    CompletableFuture<Integer> awaitFlushAsync() {
        return this.flushNotifier.await();
    }

    public void awaitFlush() throws InterruptedException {
        try {
            this.awaitFlushAsync().get();
        }
        catch (ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }

    /* synthetic */ OzoneManagerDoubleBuffer(Builder builder, OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer) {
        this(builder);
    }

    public static final class Builder {
        private OMMetadataManager omMetadataManager;
        private Consumer<TermIndex> updateLastAppliedIndex = termIndex -> {};
        private boolean isTracingEnabled = false;
        private int maxUnFlushedTransactionCount = 0;
        private FlushNotifier flushNotifier;
        private S3SecretManager s3SecretManager;
        private String threadPrefix = "";

        private Builder() {
        }

        public Builder setOmMetadataManager(OMMetadataManager omMetadataManager) {
            this.omMetadataManager = omMetadataManager;
            return this;
        }

        Builder setUpdateLastAppliedIndex(Consumer<TermIndex> updateLastAppliedIndex) {
            this.updateLastAppliedIndex = updateLastAppliedIndex;
            return this;
        }

        public Builder enableTracing(boolean enableTracing) {
            this.isTracingEnabled = enableTracing;
            return this;
        }

        public Builder setMaxUnFlushedTransactionCount(int maxUnFlushedTransactionCount) {
            this.maxUnFlushedTransactionCount = maxUnFlushedTransactionCount;
            return this;
        }

        Builder setFlushNotifier(FlushNotifier flushNotifier) {
            this.flushNotifier = flushNotifier;
            return this;
        }

        public Builder setThreadPrefix(String prefix) {
            this.threadPrefix = prefix;
            return this;
        }

        public Builder setS3SecretManager(S3SecretManager s3SecretManager) {
            this.s3SecretManager = s3SecretManager;
            return this;
        }

        public OzoneManagerDoubleBuffer build() {
            Preconditions.assertTrue(((long)this.maxUnFlushedTransactionCount > 0L ? 1 : 0) != 0, () -> "maxUnFlushedTransactionCount = " + this.maxUnFlushedTransactionCount);
            if (this.flushNotifier == null) {
                this.flushNotifier = new FlushNotifier();
            }
            return new OzoneManagerDoubleBuffer(this, null);
        }
    }

    private static class Entry {
        private final TermIndex termIndex;
        private final OMClientResponse response;

        Entry(TermIndex termIndex, OMClientResponse response) {
            this.termIndex = termIndex;
            this.response = response;
        }

        TermIndex getTermIndex() {
            return this.termIndex;
        }

        OMClientResponse getResponse() {
            return this.response;
        }
    }

    static class FlushNotifier {
        private final Map<Integer, Entry> flushFutures = new TreeMap<Integer, Entry>();
        private int awaitCount;
        private int flushCount;

        FlushNotifier() {
        }

        synchronized CompletableFuture<Integer> await() {
            ++this.awaitCount;
            int flush = this.flushCount + 2;
            LOG.debug("await flush {}", (Object)flush);
            Entry entry = this.flushFutures.computeIfAbsent(flush, key -> new Entry());
            Preconditions.assertTrue((this.flushFutures.size() <= 2 ? 1 : 0) != 0);
            return entry.await();
        }

        synchronized int notifyFlush() {
            int flush;
            Entry removed;
            int await = this.awaitCount;
            if ((removed = this.flushFutures.remove(flush = ++this.flushCount)) != null) {
                this.awaitCount -= removed.complete();
            }
            LOG.debug("notifyFlush {}, awaitCount: {} -> {}", new Object[]{flush, await, this.awaitCount});
            return await;
        }

        static class Entry {
            private final CompletableFuture<Integer> future = new CompletableFuture();
            private int count;

            Entry() {
            }

            private CompletableFuture<Integer> await() {
                ++this.count;
                return this.future;
            }

            private int complete() {
                Preconditions.assertTrue((boolean)this.future.complete(this.count));
                return this.future.join();
            }
        }
    }
}

