package com.marklogic.client.datamovement.impl;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.DatabaseClientFactory;
import com.marklogic.client.Transaction;
import com.marklogic.client.datamovement.DataMovementException;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.Forest;
import com.marklogic.client.datamovement.ForestConfiguration;
import com.marklogic.client.datamovement.JobTicket;
import com.marklogic.client.datamovement.WriteBatch;
import com.marklogic.client.datamovement.WriteBatchListener;
import com.marklogic.client.datamovement.WriteBatcher;
import com.marklogic.client.datamovement.WriteEvent;
import com.marklogic.client.datamovement.WriteFailureListener;
import com.marklogic.client.document.DocumentWriteOperation;
import com.marklogic.client.document.ServerTransform;
import com.marklogic.client.document.XMLDocumentManager;
import com.marklogic.client.impl.ClientCookie;
import com.marklogic.client.impl.DocumentWriteOperationImpl;
import com.marklogic.client.impl.Utilities;
import com.marklogic.client.io.DocumentMetadataHandle;
import com.marklogic.client.io.Format;
import com.marklogic.client.io.marker.AbstractWriteHandle;
import com.marklogic.client.io.marker.ContentHandle;
import com.marklogic.client.io.marker.DocumentMetadataWriteHandle;
import com.marklogic.client.io.marker.StructureReadHandle;
import com.marklogic.client.util.RequestLogger;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/marklogic/client/datamovement/impl/WriteBatcherImpl.class */
public class WriteBatcherImpl extends BatcherImpl implements WriteBatcher {
    private static Logger logger = LoggerFactory.getLogger(WriteBatcherImpl.class);
    private int transactionSize;
    private String temporalCollection;
    private ServerTransform transform;
    private ForestConfiguration forestConfig;
    private LinkedBlockingQueue<DocumentWriteOperation> queue;
    private List<WriteBatchListener> successListeners;
    private List<WriteFailureListener> failureListeners;
    private AtomicLong batchNumber;
    private AtomicLong batchCounter;
    private AtomicLong itemsSoFar;
    private HostInfo[] hostInfos;
    private boolean initialized;
    private CompletableThreadPoolExecutor threadPool;
    private final AtomicBoolean stopped;
    private final AtomicBoolean started;
    private boolean usingTransactions;
    private JobTicket jobTicket;
    private Calendar jobStartTime;
    private Calendar jobEndTime;
    private DocumentMetadataHandle defaultMetadata;

    /* loaded from: input_file:com/marklogic/client/datamovement/impl/WriteBatcherImpl$BatchWriter.class */
    public static class BatchWriter implements Runnable {
        private BatchWriteSet writeSet;

        public BatchWriter(BatchWriteSet batchWriteSet) {
            if (batchWriteSet.getWriteSet().size() == 0) {
                throw new IllegalStateException("Attempt to write an empty batch");
            }
            this.writeSet = batchWriteSet;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Runnable onBeforeWrite = this.writeSet.getOnBeforeWrite();
                if (onBeforeWrite != null) {
                    onBeforeWrite.run();
                }
                TransactionInfo transactionInfo = this.writeSet.getTransactionInfo();
                if (transactionInfo != null && !transactionInfo.alive.get()) {
                    throw new DataMovementException("Failed to write because transaction already underwent commit or rollback", null);
                }
                Transaction transaction = null;
                if (transactionInfo != null) {
                    transaction = transactionInfo.transaction;
                    transactionInfo.written.set(true);
                }
                WriteBatcherImpl.logger.trace("begin write batch {} to forest on host \"{}\"", Long.valueOf(this.writeSet.getBatchNumber()), this.writeSet.getClient().getHost());
                if (this.writeSet.getTemporalCollection() == null) {
                    this.writeSet.getClient().newDocumentManager().write(this.writeSet.getWriteSet(), this.writeSet.getTransform(), transaction);
                } else {
                    XMLDocumentManager newXMLDocumentManager = this.writeSet.getClient().newXMLDocumentManager();
                    newXMLDocumentManager.setContentFormat(Format.UNKNOWN);
                    newXMLDocumentManager.write(this.writeSet.getWriteSet(), this.writeSet.getTransform(), transaction, this.writeSet.getTemporalCollection());
                }
                closeAllHandles();
                Runnable onSuccess = this.writeSet.getOnSuccess();
                if (onSuccess != null) {
                    onSuccess.run();
                }
            } catch (Throwable th) {
                WriteBatcherImpl.logger.trace("failed batch sent to forest on host \"{}\"", this.writeSet.getClient().getHost());
                Consumer<Throwable> onFailure = this.writeSet.getOnFailure();
                if (onFailure != null) {
                    onFailure.accept(th);
                }
            }
        }

        private void closeAllHandles() throws Throwable {
            Throwable th = null;
            for (DocumentWriteOperation documentWriteOperation : this.writeSet.getWriteSet()) {
                try {
                    if (documentWriteOperation.getContent() instanceof Closeable) {
                        ((Closeable) documentWriteOperation.getContent()).close();
                    }
                    if (documentWriteOperation.getMetadata() instanceof Closeable) {
                        ((Closeable) documentWriteOperation.getMetadata()).close();
                    }
                } catch (Throwable th2) {
                    WriteBatcherImpl.logger.error("error calling close()", th2);
                    th = th2;
                }
            }
            if (th != null) {
                throw th;
            }
        }
    }

    /* loaded from: input_file:com/marklogic/client/datamovement/impl/WriteBatcherImpl$CompletableRejectedExecutionHandler.class */
    public static class CompletableRejectedExecutionHandler extends ThreadPoolExecutor.CallerRunsPolicy {
        CompletableThreadPoolExecutor threadPool = null;

        public void setThreadPool(CompletableThreadPoolExecutor completableThreadPoolExecutor) {
            this.threadPool = completableThreadPoolExecutor;
        }

        @Override // java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy, java.util.concurrent.RejectedExecutionHandler
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
            super.rejectedExecution(runnable, threadPoolExecutor);
            this.threadPool.taskComplete(runnable);
        }
    }

    /* loaded from: input_file:com/marklogic/client/datamovement/impl/WriteBatcherImpl$CompletableThreadPoolExecutor.class */
    public static class CompletableThreadPoolExecutor extends ThreadPoolExecutor {
        Set<Runnable> queuedAndExecutingTasks;
        Map<Thread, ConcurrentLinkedQueue<Runnable>> activeSnapshots;

        public CompletableThreadPoolExecutor(int i, int i2, long j, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue) {
            super(i, i2, j, timeUnit, blockingQueue, new CompletableRejectedExecutionHandler());
            this.queuedAndExecutingTasks = ConcurrentHashMap.newKeySet();
            this.activeSnapshots = new ConcurrentHashMap();
            ((CompletableRejectedExecutionHandler) getRejectedExecutionHandler()).setThreadPool(this);
        }

        @Override // java.util.concurrent.ThreadPoolExecutor, java.util.concurrent.Executor
        public void execute(Runnable runnable) {
            this.queuedAndExecutingTasks.add(runnable);
            super.execute(runnable);
        }

        @Override // java.util.concurrent.ThreadPoolExecutor
        protected void afterExecute(Runnable runnable, Throwable th) {
            taskComplete(runnable);
            super.afterExecute(runnable, th);
        }

        public ConcurrentLinkedQueue<Runnable> snapshotQueuedAndExecutingTasks() {
            ConcurrentLinkedQueue<Runnable> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
            this.activeSnapshots.put(Thread.currentThread(), concurrentLinkedQueue);
            concurrentLinkedQueue.addAll(this.queuedAndExecutingTasks);
            Iterator<Runnable> it = concurrentLinkedQueue.iterator();
            while (it.hasNext()) {
                Runnable next = it.next();
                if (!this.queuedAndExecutingTasks.contains(next)) {
                    concurrentLinkedQueue.remove(next);
                }
            }
            return concurrentLinkedQueue;
        }

        public void removeSnapshot() {
            this.activeSnapshots.remove(Thread.currentThread());
        }

        public void taskComplete(Runnable runnable) {
            boolean z = false;
            this.queuedAndExecutingTasks.remove(runnable);
            Iterator<ConcurrentLinkedQueue<Runnable>> it = this.activeSnapshots.values().iterator();
            while (it.hasNext()) {
                if (it.next().remove(runnable)) {
                    z = true;
                }
            }
            if (z) {
                synchronized (runnable) {
                    runnable.notifyAll();
                }
            }
        }

        public void replaceTask(Runnable runnable, Runnable runnable2) {
            boolean z = false;
            if (this.queuedAndExecutingTasks.remove(runnable)) {
                this.queuedAndExecutingTasks.add(runnable2);
            }
            for (ConcurrentLinkedQueue<Runnable> concurrentLinkedQueue : this.activeSnapshots.values()) {
                if (concurrentLinkedQueue.remove(runnable)) {
                    concurrentLinkedQueue.add(runnable2);
                    z = true;
                }
            }
            if (z) {
                synchronized (runnable) {
                    runnable.notifyAll();
                }
            }
        }

        public boolean awaitCompletion(long j, TimeUnit timeUnit) throws InterruptedException {
            if (timeUnit == null) {
                throw new IllegalArgumentException("unit cannot be null");
            }
            ConcurrentLinkedQueue<Runnable> snapshotQueuedAndExecutingTasks = snapshotQueuedAndExecutingTasks();
            try {
                long convert = timeUnit.convert(j, TimeUnit.MILLISECONDS);
                while (true) {
                    Runnable peek = snapshotQueuedAndExecutingTasks.peek();
                    if (peek == null) {
                        removeSnapshot();
                        return true;
                    }
                    synchronized (peek) {
                        do {
                            if (snapshotQueuedAndExecutingTasks.contains(peek) && this.queuedAndExecutingTasks.contains(peek)) {
                                long currentTimeMillis = System.currentTimeMillis();
                                peek.wait(convert);
                                convert -= System.currentTimeMillis() - currentTimeMillis;
                            }
                        } while (convert > 0);
                        WriteBatcherImpl.logger.debug("[awaitCompletion] timeout");
                        return false;
                    }
                }
            } finally {
                removeSnapshot();
            }
        }

        @Override // java.util.concurrent.ThreadPoolExecutor, java.util.concurrent.ExecutorService
        public List<Runnable> shutdownNow() {
            List<Runnable> shutdownNow = super.shutdownNow();
            Iterator<Runnable> it = shutdownNow.iterator();
            while (it.hasNext()) {
                taskComplete(it.next());
            }
            return shutdownNow;
        }
    }

    /* loaded from: input_file:com/marklogic/client/datamovement/impl/WriteBatcherImpl$HostInfo.class */
    public static class HostInfo {
        public String hostName;
        public DatabaseClient client;
        public AtomicLong transactionCounter = new AtomicLong(0);
        public ConcurrentLinkedDeque<TransactionInfo> transactionInfos = new ConcurrentLinkedDeque<>();
        public ConcurrentLinkedQueue<TransactionInfo> unfinishedTransactions = new ConcurrentLinkedQueue<>();

        /* JADX INFO: Access modifiers changed from: private */
        public TransactionInfo getTransactionInfoAndDrainPermits() {
            TransactionInfo poll = this.transactionInfos.poll();
            if (poll != null && poll.transactionPermits.getAndSet(0) > 0) {
                return poll;
            }
            return null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public TransactionInfo getTransactionInfo() {
            TransactionInfo poll = this.transactionInfos.poll();
            if (poll == null) {
                return null;
            }
            int decrementAndGet = poll.transactionPermits.decrementAndGet();
            if (decrementAndGet < 0) {
                this.unfinishedTransactions.add(poll);
                return getTransactionInfo();
            }
            if (decrementAndGet > 0) {
                this.transactionInfos.addFirst(poll);
            } else {
                this.unfinishedTransactions.add(poll);
            }
            return poll;
        }

        public void addTransactionInfo(TransactionInfo transactionInfo) {
            this.transactionInfos.add(transactionInfo);
        }

        public void releaseTransactionInfo(TransactionInfo transactionInfo) {
            transactionInfo.transactionPermits.set(0);
            this.transactionInfos.remove(transactionInfo);
            this.unfinishedTransactions.remove(transactionInfo);
        }
    }

    /* loaded from: input_file:com/marklogic/client/datamovement/impl/WriteBatcherImpl$TransactionInfo.class */
    public static class TransactionInfo {
        private Transaction transaction;
        public AtomicBoolean alive = new AtomicBoolean(false);
        public AtomicBoolean written = new AtomicBoolean(false);
        public AtomicReference<Throwable> throwable = new AtomicReference<>();
        public AtomicLong inProcess = new AtomicLong(0);
        public AtomicLong batchesFinished = new AtomicLong(0);
        public AtomicBoolean queuedForCleanup = new AtomicBoolean(false);
        public ConcurrentLinkedQueue<BatchWriteSet> batches = new ConcurrentLinkedQueue<>();
        private AtomicInteger transactionPermits = new AtomicInteger(0);
    }

    public WriteBatcherImpl(DataMovementManager dataMovementManager, ForestConfiguration forestConfiguration) {
        super(dataMovementManager);
        this.queue = new LinkedBlockingQueue<>();
        this.successListeners = new ArrayList();
        this.failureListeners = new ArrayList();
        this.batchNumber = new AtomicLong(0L);
        this.batchCounter = new AtomicLong(0L);
        this.itemsSoFar = new AtomicLong(0L);
        this.initialized = false;
        this.threadPool = null;
        this.stopped = new AtomicBoolean(false);
        this.started = new AtomicBoolean(false);
        this.usingTransactions = false;
        withForestConfig(forestConfiguration);
    }

    public void initialize() {
        if (this.initialized) {
            return;
        }
        synchronized (this) {
            if (this.initialized) {
                return;
            }
            if (getBatchSize() <= 0) {
                withBatchSize(1);
                logger.warn("batchSize should be 1 or greater--setting batchSize to 1");
            }
            if (this.transactionSize > 1) {
                this.usingTransactions = true;
            }
            if (getThreadCount() <= 0) {
                withThreadCount(this.hostInfos.length);
                logger.warn("threadCount should be 1 or greater--setting threadCount to number of hosts ({})", Integer.valueOf(this.hostInfos.length));
            }
            this.threadPool = new CompletableThreadPoolExecutor(getThreadCount(), getThreadCount(), 1L, TimeUnit.MINUTES, new LinkedBlockingQueue(getThreadCount() * 3));
            this.threadPool.allowCoreThreadTimeOut(true);
            this.initialized = true;
            logger.info("threadCount={}", Integer.valueOf(getThreadCount()));
            logger.info("batchSize={}", Integer.valueOf(getBatchSize()));
            if (this.usingTransactions) {
                logger.info("transactionSize={}", Integer.valueOf(this.transactionSize));
            }
            this.jobStartTime = Calendar.getInstance();
            this.started.set(true);
        }
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public WriteBatcher add(String str, AbstractWriteHandle abstractWriteHandle) {
        add(str, null, abstractWriteHandle);
        return this;
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public WriteBatcher addAs(String str, Object obj) {
        return addAs(str, null, obj);
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public WriteBatcher add(DocumentWriteOperation documentWriteOperation) {
        DocumentWriteOperation poll;
        if (documentWriteOperation.getUri() == null) {
            throw new IllegalArgumentException("uri must not be null");
        }
        if (documentWriteOperation.getContent() == null) {
            throw new IllegalArgumentException("contentHandle must not be null");
        }
        initialize();
        requireNotStopped();
        this.queue.add(documentWriteOperation);
        logger.trace("add uri={}", documentWriteOperation.getUri());
        if (this.batchCounter.incrementAndGet() % ((long) getBatchSize()) == 0) {
            BatchWriteSet newBatchWriteSet = newBatchWriteSet(false);
            int i = 0;
            if (this.defaultMetadata != null) {
                newBatchWriteSet.getWriteSet().add(new DocumentWriteOperationImpl(DocumentWriteOperation.OperationType.METADATA_DEFAULT, null, this.defaultMetadata, null));
                i = 1;
            }
            for (int i2 = 0; i2 < getBatchSize() && (poll = this.queue.poll()) != null; i2++) {
                newBatchWriteSet.getWriteSet().add(poll);
            }
            if (newBatchWriteSet.getWriteSet().size() > i) {
                this.threadPool.submit(new BatchWriter(newBatchWriteSet));
            }
        }
        return this;
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public WriteBatcher add(String str, DocumentMetadataWriteHandle documentMetadataWriteHandle, AbstractWriteHandle abstractWriteHandle) {
        add(new DocumentWriteOperationImpl(DocumentWriteOperation.OperationType.DOCUMENT_WRITE, str, documentMetadataWriteHandle, abstractWriteHandle));
        return this;
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public WriteBatcher add(WriteEvent... writeEventArr) {
        for (WriteEvent writeEvent : writeEventArr) {
            add(writeEvent.getTargetUri(), writeEvent.getMetadata(), writeEvent.getContent());
        }
        return this;
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public WriteBatcher addAs(String str, DocumentMetadataWriteHandle documentMetadataWriteHandle, Object obj) {
        AbstractWriteHandle abstractWriteHandle;
        if (obj == null) {
            throw new IllegalArgumentException("content must not be null");
        }
        Class<?> cls = obj.getClass();
        if (AbstractWriteHandle.class.isAssignableFrom(cls)) {
            abstractWriteHandle = (AbstractWriteHandle) obj;
        } else {
            ContentHandle makeHandle = DatabaseClientFactory.getHandleRegistry().makeHandle(cls);
            Utilities.setHandleContent(makeHandle, obj);
            abstractWriteHandle = makeHandle;
        }
        return add(str, documentMetadataWriteHandle, abstractWriteHandle);
    }

    private void requireInitialized() {
        if (!this.initialized) {
            throw new IllegalStateException("This operation must be called after starting this job");
        }
    }

    private void requireNotInitialized() {
        if (this.initialized) {
            throw new IllegalStateException("Configuration cannot be changed after starting this job or calling add or addAs");
        }
    }

    private void requireNotStopped() {
        if (isStopped()) {
            throw new IllegalStateException("This instance has been stopped");
        }
    }

    private BatchWriteSet newBatchWriteSet(boolean z) {
        return newBatchWriteSet(z, this.batchNumber.incrementAndGet());
    }

    private BatchWriteSet newBatchWriteSet(boolean z, long j) {
        HostInfo hostInfo = this.hostInfos[(int) (j % this.hostInfos.length)];
        DatabaseClient databaseClient = hostInfo.client;
        BatchWriteSet batchWriteSet = new BatchWriteSet(this, databaseClient.newDocumentManager().newWriteSet(), databaseClient, getTransform(), getTemporalCollection());
        batchWriteSet.setBatchNumber(j);
        if (this.usingTransactions) {
            batchWriteSet.onBeforeWrite(() -> {
                if (hostInfo.transactionCounter.getAndIncrement() % ((long) getTransactionSize()) == 0) {
                    batchWriteSet.setTransactionInfo(transactionOpener(hostInfo, databaseClient, this.transactionSize));
                    return;
                }
                TransactionInfo transactionInfo = hostInfo.getTransactionInfo();
                if (transactionInfo == null) {
                    batchWriteSet.setTransactionInfo(transactionOpener(hostInfo, databaseClient, this.transactionSize));
                } else {
                    batchWriteSet.setTransactionInfo(transactionInfo);
                    transactionInfo.inProcess.incrementAndGet();
                }
            });
        }
        batchWriteSet.onSuccess(() -> {
            boolean z2 = false;
            if (this.usingTransactions) {
                TransactionInfo transactionInfo = batchWriteSet.getTransactionInfo();
                boolean z3 = transactionInfo.batchesFinished.incrementAndGet() == ((long) getTransactionSize());
                if (!z && !z3) {
                    transactionInfo.batches.add(batchWriteSet);
                } else if (transactionInfo.alive.get()) {
                    if (transactionInfo.inProcess.get() <= 1) {
                        hostInfo.transactionCounter.set(0L);
                        transactionInfo.transaction.commit();
                        z2 = true;
                        sendSuccessToListeners(transactionInfo.batches);
                    } else {
                        transactionInfo.batches.add(batchWriteSet);
                        hostInfo.unfinishedTransactions.add(transactionInfo);
                    }
                }
                transactionInfo.inProcess.decrementAndGet();
            } else {
                z2 = true;
            }
            if (z2) {
                sendSuccessToListeners(batchWriteSet);
            }
        });
        batchWriteSet.onFailure(th -> {
            hostInfo.transactionCounter.set(0L);
            if (this.usingTransactions) {
                TransactionInfo transactionInfo = batchWriteSet.getTransactionInfo();
                transactionInfo.throwable.set(th);
                if (transactionInfo.inProcess.get() <= 1) {
                    try {
                        logger.warn("Rolling back transaction because of throwable: {}", th.toString());
                        transactionInfo.transaction.rollback();
                    } catch (Throwable th) {
                        th.addSuppressed(th);
                        logger.warn("Failure to rollback transaction: {}", th.toString());
                    }
                    sendThrowableToListeners(th, (String) null, transactionInfo.batches);
                } else {
                    hostInfo.unfinishedTransactions.add(transactionInfo);
                }
                transactionInfo.inProcess.decrementAndGet();
            }
            sendThrowableToListeners(th, "Error writing batch: {}", batchWriteSet);
        });
        return batchWriteSet;
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public WriteBatcher onBatchSuccess(WriteBatchListener writeBatchListener) {
        if (writeBatchListener == null) {
            throw new IllegalArgumentException("listener must not be null");
        }
        this.successListeners.add(writeBatchListener);
        return this;
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public WriteBatcher onBatchFailure(WriteFailureListener writeFailureListener) {
        if (writeFailureListener == null) {
            throw new IllegalArgumentException("listener must not be null");
        }
        this.failureListeners.add(writeFailureListener);
        return this;
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public void retryWithFailureListeners(WriteBatch writeBatch) {
        retry(writeBatch, true);
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public void retry(WriteBatch writeBatch) {
        retry(writeBatch, false);
    }

    private void retry(WriteBatch writeBatch, boolean z) {
        if (isStopped()) {
            logger.warn("Job is now stopped, aborting the retry");
            return;
        }
        if (writeBatch == null) {
            throw new IllegalArgumentException("batch must not be null");
        }
        BatchWriteSet newBatchWriteSet = newBatchWriteSet(true, writeBatch.getJobBatchNumber());
        if (!z) {
            newBatchWriteSet.onFailure(th -> {
                if (!(th instanceof RuntimeException)) {
                    throw new DataMovementException("Failed to retry batch", th);
                }
                throw ((RuntimeException) th);
            });
        }
        for (WriteEvent writeEvent : writeBatch.getItems()) {
            newBatchWriteSet.getWriteSet().add(writeEvent.getTargetUri(), writeEvent.getMetadata(), writeEvent.getContent());
        }
        new BatchWriter(newBatchWriteSet).run();
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public WriteBatchListener[] getBatchSuccessListeners() {
        return (WriteBatchListener[]) this.successListeners.toArray(new WriteBatchListener[this.successListeners.size()]);
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public WriteFailureListener[] getBatchFailureListeners() {
        return (WriteFailureListener[]) this.failureListeners.toArray(new WriteFailureListener[this.failureListeners.size()]);
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public void setBatchSuccessListeners(WriteBatchListener... writeBatchListenerArr) {
        requireNotInitialized();
        this.successListeners.clear();
        if (writeBatchListenerArr != null) {
            for (WriteBatchListener writeBatchListener : writeBatchListenerArr) {
                this.successListeners.add(writeBatchListener);
            }
        }
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public void setBatchFailureListeners(WriteFailureListener... writeFailureListenerArr) {
        requireNotInitialized();
        this.failureListeners.clear();
        if (writeFailureListenerArr != null) {
            for (WriteFailureListener writeFailureListener : writeFailureListenerArr) {
                this.failureListeners.add(writeFailureListener);
            }
        }
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public void flushAsync() {
        flush(false);
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public void flushAndWait() {
        flush(true);
    }

    private void flush(boolean z) {
        requireInitialized();
        requireNotStopped();
        ArrayList arrayList = new ArrayList();
        this.batchCounter.set(0L);
        this.queue.drainTo(arrayList);
        logger.info("flushing {} queued docs", Integer.valueOf(arrayList.size()));
        Iterator it = arrayList.iterator();
        int i = 0;
        while (it.hasNext()) {
            if (isStopped()) {
                logger.warn("Job is now stopped, preventing the flush of {} queued docs", Integer.valueOf(arrayList.size() - i));
                if (z) {
                    awaitCompletion();
                    return;
                }
                return;
            }
            BatchWriteSet newBatchWriteSet = newBatchWriteSet(true);
            if (this.defaultMetadata != null) {
                newBatchWriteSet.getWriteSet().add(new DocumentWriteOperationImpl(DocumentWriteOperation.OperationType.METADATA_DEFAULT, null, this.defaultMetadata, null));
            }
            for (int i2 = 0; i2 < getBatchSize() && it.hasNext(); i2++) {
                newBatchWriteSet.getWriteSet().add((DocumentWriteOperation) it.next());
            }
            this.threadPool.submit(new BatchWriter(newBatchWriteSet));
            i++;
        }
        if (z) {
            awaitCompletion();
        }
        if (this.usingTransactions) {
            Runnable runnable = () -> {
                cleanupUnfinishedTransactions();
                for (HostInfo hostInfo : this.hostInfos) {
                    while (true) {
                        TransactionInfo transactionInfoAndDrainPermits = hostInfo.getTransactionInfoAndDrainPermits();
                        if (transactionInfoAndDrainPermits != null) {
                            completeTransaction(transactionInfoAndDrainPermits);
                        }
                    }
                }
            };
            if (z) {
                runnable.run();
            } else {
                this.threadPool.submit(runnable);
            }
        }
    }

    public boolean completeTransaction(TransactionInfo transactionInfo) {
        boolean z = false;
        try {
            if (transactionInfo.alive.get() && transactionInfo.inProcess.get() <= 0 && transactionInfo.written.get()) {
                if (transactionInfo.throwable.get() != null) {
                    transactionInfo.transaction.rollback();
                    sendThrowableToListeners(transactionInfo.throwable.get(), "Failure during transaction: {}", transactionInfo.batches);
                } else {
                    transactionInfo.transaction.commit();
                    sendSuccessToListeners(transactionInfo.batches);
                }
                z = true;
            }
        } catch (Throwable th) {
            transactionInfo.throwable.set(th);
            sendThrowableToListeners(th, "Failure to complete transaction: {}", transactionInfo.batches);
        }
        return z;
    }

    private void sendSuccessToListeners(Collection<BatchWriteSet> collection) {
        Iterator<BatchWriteSet> it = collection.iterator();
        while (it.hasNext()) {
            sendSuccessToListeners(it.next());
        }
    }

    private void sendSuccessToListeners(BatchWriteSet batchWriteSet) {
        batchWriteSet.setItemsSoFar(this.itemsSoFar.addAndGet(batchWriteSet.getWriteSet().size()));
        WriteBatch batchOfWriteEvents = batchWriteSet.getBatchOfWriteEvents();
        Iterator<WriteBatchListener> it = this.successListeners.iterator();
        while (it.hasNext()) {
            try {
                it.next().processEvent(batchOfWriteEvents);
            } catch (Throwable th) {
                logger.error("Exception thrown by an onBatchSuccess listener", th);
            }
        }
    }

    private void sendThrowableToListeners(Throwable th, String str, Collection<BatchWriteSet> collection) {
        Iterator<BatchWriteSet> it = collection.iterator();
        while (it.hasNext()) {
            sendThrowableToListeners(th, (String) null, it.next());
        }
        if (str != null) {
            logger.warn(str, th.toString());
        }
    }

    private void sendThrowableToListeners(Throwable th, String str, BatchWriteSet batchWriteSet) {
        batchWriteSet.setItemsSoFar(this.itemsSoFar.get());
        WriteBatch batchOfWriteEvents = batchWriteSet.getBatchOfWriteEvents();
        Iterator<WriteFailureListener> it = this.failureListeners.iterator();
        while (it.hasNext()) {
            try {
                it.next().processFailure(batchOfWriteEvents, th);
            } catch (Throwable th2) {
                logger.error("Exception thrown by an onBatchFailure listener", th2);
            }
        }
        if (str != null) {
            logger.warn(str, th.toString());
        }
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl
    public void start(JobTicket jobTicket) {
        this.jobTicket = jobTicket;
        initialize();
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl
    public void stop() {
        this.jobEndTime = Calendar.getInstance();
        this.stopped.set(true);
        if (this.threadPool != null) {
            this.threadPool.shutdownNow();
        }
        closeAllListeners();
    }

    private void closeAllListeners() {
        for (WriteBatchListener writeBatchListener : getBatchSuccessListeners()) {
            if (writeBatchListener instanceof AutoCloseable) {
                try {
                    ((AutoCloseable) writeBatchListener).close();
                } catch (Exception e) {
                    logger.error("onBatchSuccess listener cannot be closed", e);
                }
            }
        }
        for (WriteFailureListener writeFailureListener : getBatchFailureListeners()) {
            if (writeFailureListener instanceof AutoCloseable) {
                try {
                    ((AutoCloseable) writeFailureListener).close();
                } catch (Exception e2) {
                    logger.error("onBatchFailure listener cannot be closed", e2);
                }
            }
        }
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl, com.marklogic.client.datamovement.Batcher
    public boolean isStopped() {
        return this.stopped.get();
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl, com.marklogic.client.datamovement.Batcher
    public boolean isStarted() {
        return this.started.get();
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl, com.marklogic.client.datamovement.Batcher
    public JobTicket getJobTicket() {
        requireInitialized();
        return this.jobTicket;
    }

    @Override // com.marklogic.client.datamovement.Batcher
    public Calendar getJobStartTime() {
        if (isStarted()) {
            return this.jobStartTime;
        }
        return null;
    }

    @Override // com.marklogic.client.datamovement.Batcher
    public Calendar getJobEndTime() {
        if (isStopped()) {
            return this.jobEndTime;
        }
        return null;
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public boolean awaitCompletion(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.threadPool.awaitCompletion(j, timeUnit);
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public boolean awaitCompletion() {
        try {
            return awaitCompletion(RequestLogger.ALL_CONTENT, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            logger.debug("awaitCompletion caught InterruptedException");
            return false;
        }
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl, com.marklogic.client.datamovement.Batcher
    public WriteBatcher withJobName(String str) {
        requireNotInitialized();
        super.withJobName(str);
        return this;
    }

    @Override // com.marklogic.client.datamovement.Batcher
    public WriteBatcher withJobId(String str) {
        requireNotInitialized();
        setJobId(str);
        return this;
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl, com.marklogic.client.datamovement.Batcher
    public WriteBatcher withBatchSize(int i) {
        requireNotInitialized();
        super.withBatchSize(i);
        return this;
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl, com.marklogic.client.datamovement.Batcher
    public WriteBatcher withThreadCount(int i) {
        requireNotInitialized();
        super.withThreadCount(i);
        return this;
    }

    public WriteBatcher withTransactionSize(int i) {
        requireNotInitialized();
        this.transactionSize = i;
        return this;
    }

    public int getTransactionSize() {
        return this.transactionSize;
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public WriteBatcher withTemporalCollection(String str) {
        requireNotInitialized();
        this.temporalCollection = str;
        return this;
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public String getTemporalCollection() {
        return this.temporalCollection;
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public WriteBatcher withTransform(ServerTransform serverTransform) {
        requireNotInitialized();
        this.transform = serverTransform;
        return this;
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public ServerTransform getTransform() {
        return this.transform;
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl, com.marklogic.client.datamovement.Batcher
    public synchronized WriteBatcher withForestConfig(ForestConfiguration forestConfiguration) {
        super.withForestConfig(forestConfiguration);
        Forest[] forests = forests(forestConfiguration);
        Set<String> hosts = hosts(forests);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        if (this.hostInfos != null) {
            for (HostInfo hostInfo : this.hostInfos) {
                hashMap.put(hostInfo.hostName, hostInfo);
                hashMap2.put(hostInfo.hostName, hostInfo);
            }
        }
        logger.info("(withForestConfig) Using forests on {} hosts for \"{}\"", hosts, forests[0].getDatabaseName());
        HostInfo[] hostInfoArr = new HostInfo[hosts.size()];
        int i = 0;
        for (String str : hosts) {
            if (hashMap.get(str) != null) {
                hostInfoArr[i] = (HostInfo) hashMap.get(str);
                hashMap2.remove(str);
            } else {
                hostInfoArr[i] = new HostInfo();
                hostInfoArr[i].hostName = str;
                hostInfoArr[i].client = getMoveMgr().getHostClient(str);
                if (getMoveMgr().getConnectionType() == DatabaseClient.ConnectionType.DIRECT) {
                    logger.info("Adding DatabaseClient on port {} for host \"{}\" to the rotation", Integer.valueOf(hostInfoArr[i].client.getPort()), str);
                }
            }
            i++;
        }
        this.forestConfig = forestConfiguration;
        this.hostInfos = hostInfoArr;
        if (hashMap2.size() > 0) {
            DataMovementManagerImpl moveMgr = getMoveMgr();
            if (hashMap2.containsKey(moveMgr.getPrimaryClient().getHost())) {
                moveMgr.setPrimaryClient(hostInfoArr[new Random().nextInt(hostInfoArr.length)].client);
            }
            ArrayList<Runnable> arrayList = new ArrayList();
            if (this.threadPool != null) {
                this.threadPool.getQueue().drainTo(arrayList);
            }
            for (Runnable runnable : arrayList) {
                if (runnable instanceof BatchWriter) {
                    BatchWriter batchWriter = (BatchWriter) runnable;
                    if (hashMap2.containsKey(batchWriter.writeSet.getClient().getHost())) {
                        BatchWriteSet newBatchWriteSet = newBatchWriteSet(true, batchWriter.writeSet.getBatchNumber());
                        newBatchWriteSet.onFailure(th -> {
                            if (!(th instanceof RuntimeException)) {
                                throw new DataMovementException("Failed to retry batch after failover", th);
                            }
                            throw ((RuntimeException) th);
                        });
                        for (WriteEvent writeEvent : batchWriter.writeSet.getBatchOfWriteEvents().getItems()) {
                            newBatchWriteSet.getWriteSet().add(writeEvent.getTargetUri(), writeEvent.getMetadata(), writeEvent.getContent());
                        }
                        this.threadPool.replaceTask(batchWriter, (Runnable) this.threadPool.submit(new BatchWriter(newBatchWriteSet)));
                    }
                }
                this.threadPool.replaceTask(runnable, (Runnable) this.threadPool.submit(runnable));
            }
            Iterator it = hashMap2.values().iterator();
            while (it.hasNext()) {
                cleanupUnfinishedTransactions((HostInfo) it.next());
            }
        }
        return this;
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl, com.marklogic.client.datamovement.Batcher
    public ForestConfiguration getForestConfig() {
        return this.forestConfig;
    }

    private void cleanupUnfinishedTransactions() {
        for (HostInfo hostInfo : this.hostInfos) {
            cleanupUnfinishedTransactions(hostInfo);
        }
    }

    private void cleanupUnfinishedTransactions(HostInfo hostInfo) {
        Iterator<TransactionInfo> it = hostInfo.unfinishedTransactions.iterator();
        while (it.hasNext()) {
            TransactionInfo next = it.next();
            if (!next.alive.get()) {
                it.remove();
            } else if (!next.queuedForCleanup.get() && next.inProcess.get() <= 0) {
                if (next.written.get()) {
                    next.queuedForCleanup.set(true);
                    this.threadPool.submit(() -> {
                        if (completeTransaction(next)) {
                            hostInfo.unfinishedTransactions.remove(next);
                        } else {
                            next.queuedForCleanup.set(false);
                        }
                    });
                } else {
                    it.remove();
                }
            }
        }
    }

    public TransactionInfo transactionOpener(final HostInfo hostInfo, DatabaseClient databaseClient, int i) {
        final TransactionInfo transactionInfo = new TransactionInfo();
        transactionInfo.transactionPermits.set(i - 1);
        final Transaction openTransaction = databaseClient.openTransaction();
        logger.trace("opened transaction {}", openTransaction.getTransactionId());
        transactionInfo.transaction = new Transaction() { // from class: com.marklogic.client.datamovement.impl.WriteBatcherImpl.1
            @Override // com.marklogic.client.Transaction
            public void commit() {
                hostInfo.releaseTransactionInfo(transactionInfo);
                if (transactionInfo.alive.getAndSet(false)) {
                    openTransaction.commit();
                    WriteBatcherImpl.logger.trace("committed transaction {}", openTransaction.getTransactionId());
                }
            }

            @Override // com.marklogic.client.Transaction
            public List<ClientCookie> getCookies() {
                return openTransaction.getCookies();
            }

            @Override // com.marklogic.client.Transaction
            public String getHostId() {
                return openTransaction.getHostId();
            }

            @Override // com.marklogic.client.Transaction
            public String getTransactionId() {
                return openTransaction.getTransactionId();
            }

            @Override // com.marklogic.client.Transaction
            public <T extends StructureReadHandle> T readStatus(T t) {
                return (T) openTransaction.readStatus(t);
            }

            @Override // com.marklogic.client.Transaction
            public void rollback() {
                hostInfo.releaseTransactionInfo(transactionInfo);
                if (transactionInfo.alive.getAndSet(false)) {
                    openTransaction.rollback();
                    WriteBatcherImpl.logger.trace("rolled back transaction {}", openTransaction.getTransactionId());
                }
            }
        };
        transactionInfo.alive.set(true);
        transactionInfo.inProcess.incrementAndGet();
        hostInfo.addTransactionInfo(transactionInfo);
        cleanupUnfinishedTransactions();
        return transactionInfo;
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public WriteBatcher withDefaultMetadata(DocumentMetadataHandle documentMetadataHandle) {
        this.defaultMetadata = documentMetadataHandle;
        return this;
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public void addAll(Stream<? extends DocumentWriteOperation> stream) {
        stream.forEach(this::add);
    }

    @Override // com.marklogic.client.datamovement.WriteBatcher
    public DocumentMetadataHandle getDocumentMetadata() {
        return this.defaultMetadata;
    }
}
