/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.client.datamovement.impl;

import com.fasterxml.jackson.databind.JsonNode;
import com.marklogic.client.DatabaseClient;
import com.marklogic.client.DatabaseClientFactory;
import com.marklogic.client.MarkLogicInternalException;
import com.marklogic.client.datamovement.Forest;
import com.marklogic.client.datamovement.ForestConfiguration;
import com.marklogic.client.datamovement.JobTicket;
import com.marklogic.client.datamovement.RowBatchFailureListener;
import com.marklogic.client.datamovement.RowBatchSuccessListener;
import com.marklogic.client.datamovement.RowBatcher;
import com.marklogic.client.datamovement.impl.BatchEventImpl;
import com.marklogic.client.datamovement.impl.BatcherImpl;
import com.marklogic.client.datamovement.impl.DataMovementManagerImpl;
import com.marklogic.client.expression.PlanBuilder;
import com.marklogic.client.impl.DatabaseClientImpl;
import com.marklogic.client.io.BaseHandle;
import com.marklogic.client.io.Format;
import com.marklogic.client.io.JacksonHandle;
import com.marklogic.client.io.StringHandle;
import com.marklogic.client.io.marker.AbstractWriteHandle;
import com.marklogic.client.io.marker.ContentHandle;
import com.marklogic.client.io.marker.StructureReadHandle;
import com.marklogic.client.row.RawPlanDefinition;
import com.marklogic.client.row.RawQueryDSLPlan;
import com.marklogic.client.row.RowManager;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RowBatcherImpl<T>
extends BatcherImpl
implements RowBatcher<T> {
    private static final int DEFAULT_BATCH_SIZE = 1000;
    private static final long MAX_UNSIGNED_LONG = -1L;
    private static Logger logger = LoggerFactory.getLogger(RowBatcherImpl.class);
    private static final String LOWER_BOUND = "ML_LOWER_BOUND";
    private static final String UPPER_BOUND = "ML_UPPER_BOUND";
    private long batchSize = 0L;
    private long batchCount = 0L;
    private BatchThreadPoolExecutor threadPool;
    private final AtomicLong batchNum = new AtomicLong(0L);
    private final AtomicLong failedBatches = new AtomicLong(0L);
    private final AtomicInteger runningThreads = new AtomicInteger(0);
    private RowBatchFailureListener[] failureListeners;
    private RowBatchSuccessListener[] successListeners;
    private RawPlanDefinition pagedPlan;
    private long rowCount = 0L;
    private HostInfo[] hostInfos;
    private boolean consistentSnapshot = false;
    private final AtomicLong serverTimestamp = new AtomicLong(-1L);
    private final ContentHandle<T> rowsHandle;
    private final RowManager defaultRowManager;

    RowBatcherImpl(DataMovementManagerImpl moveMgr, ContentHandle<T> rowsHandle) {
        super(moveMgr);
        this.validateRowsHandle(rowsHandle);
        this.rowsHandle = rowsHandle;
        this.defaultRowManager = this.getPrimaryClient().newRowManager();
        super.withBatchSize(1000);
        if (moveMgr.getConnectionType() == DatabaseClient.ConnectionType.DIRECT) {
            this.withForestConfig(moveMgr.getForestConfig());
        }
    }

    private void validateRowsHandle(ContentHandle<T> rowsHandle) {
        if (rowsHandle == null) {
            throw new IllegalArgumentException("Cannot create RowBatcher with null rows manager");
        }
        if (!(rowsHandle instanceof StructureReadHandle)) {
            throw new IllegalArgumentException("Rows handle must also be StructureReadHandle");
        }
        if (!(rowsHandle instanceof BaseHandle)) {
            throw new IllegalArgumentException("Rows handle must also be BaseHandle");
        }
        if (((BaseHandle)((Object)rowsHandle)).getFormat() == Format.UNKNOWN) {
            throw new IllegalArgumentException("Rows handle must specify a format");
        }
        Class<T> rowsClass = rowsHandle.getContentClass();
        if (rowsClass == null) {
            throw new IllegalArgumentException("Rows handle cannot have a null content class");
        }
        if (!DatabaseClientFactory.getHandleRegistry().isRegistered(rowsClass)) {
            throw new IllegalArgumentException("Rows handle must be registered with DatabaseClientFactory.HandleFactoryRegistry");
        }
    }

    @Override
    public RowManager getRowManager() {
        return this.defaultRowManager;
    }

    @Override
    public RowBatcher<T> withBatchView(PlanBuilder.ModifyPlan inputPlan) {
        if (inputPlan == null) {
            throw new IllegalArgumentException("Plan cannot be null");
        }
        this.analyzePlan(inputPlan.export(new StringHandle().withFormat(Format.JSON)));
        return this;
    }

    @Override
    public RowBatcher<T> withBatchView(RawPlanDefinition viewPlan) {
        if (viewPlan == null) {
            throw new IllegalArgumentException("Raw plan definition cannot be null");
        }
        this.analyzePlan(viewPlan.getHandle());
        return this;
    }

    @Override
    public RowBatcher<T> withBatchView(RawQueryDSLPlan viewPlan) {
        if (viewPlan == null) {
            throw new IllegalArgumentException("Raw query DSL plan cannot be null");
        }
        this.analyzePlan(viewPlan.getHandle());
        return this;
    }

    private void analyzePlan(AbstractWriteHandle userPlan) {
        this.requireNotStarted("Must specify batch view before starting job");
        DatabaseClientImpl client = (DatabaseClientImpl)this.getPrimaryClient();
        JsonNode viewInfo = client.getServices().postResource(null, "internal/viewinfo", null, null, userPlan, new JacksonHandle()).get();
        this.rowCount = viewInfo.get("rowCount").asLong(0L);
        this.pagedPlan = this.getRowManager().newRawPlanDefinition(new JacksonHandle(viewInfo.get("modifiedPlan")));
        JsonNode schemaNode = viewInfo.get("schemaName");
        logger.info("plan analysis schema name: {}, view name: {}, row estimate: {}", new Object[]{schemaNode != null ? schemaNode.asText(null) : null, viewInfo.get("viewName").asText(null), this.rowCount});
    }

    @Override
    public RowBatcher<T> withBatchSize(int batchSize) {
        this.requireNotStarted("Must set batch size before starting job");
        super.withBatchSize(batchSize);
        return this;
    }

    @Override
    public RowBatcher<T> withThreadCount(int threadCount) {
        this.requireNotStarted("Must set thread count before starting job");
        super.withThreadCount(threadCount);
        return this;
    }

    @Override
    public RowBatcher<T> onSuccess(RowBatchSuccessListener listener) {
        this.requireNotStarted("Must set success listener before starting job");
        if (listener == null) {
            this.successListeners = null;
        } else if (this.successListeners == null || this.successListeners.length == 0) {
            this.successListeners = new RowBatchSuccessListener[]{listener};
        } else {
            this.successListeners = Arrays.copyOf(this.successListeners, this.successListeners.length + 1);
            this.successListeners[this.successListeners.length - 1] = listener;
        }
        return this;
    }

    @Override
    public RowBatcher<T> onFailure(RowBatchFailureListener listener) {
        this.requireNotStarted("Must set failure listener before starting job");
        if (listener == null) {
            this.failureListeners = null;
        } else if (this.failureListeners == null || this.failureListeners.length == 0) {
            this.failureListeners = new RowBatchFailureListener[]{listener};
        } else {
            this.failureListeners = Arrays.copyOf(this.failureListeners, this.failureListeners.length + 1);
            this.failureListeners[this.failureListeners.length - 1] = listener;
        }
        return this;
    }

    @Override
    public RowBatcher<T> withJobId(String jobId) {
        this.requireNotStarted("Must set job id before starting job");
        super.setJobId(jobId);
        return this;
    }

    @Override
    public RowBatcher<T> withJobName(String jobName) {
        this.requireNotStarted("Must set job name before starting job");
        super.withJobName(jobName);
        return this;
    }

    @Override
    public RowBatcher<T> withConsistentSnapshot() {
        this.requireNotStarted("Must set consistent snapshot before starting job");
        if (!(this.rowsHandle instanceof BaseHandle)) {
            throw new IllegalStateException("Content handle for consistent snapshot must extend BaseHandle");
        }
        this.consistentSnapshot = true;
        return this;
    }

    @Override
    public RowBatchSuccessListener[] getSuccessListeners() {
        return this.successListeners;
    }

    @Override
    public RowBatchFailureListener[] getFailureListeners() {
        return this.failureListeners;
    }

    @Override
    public void setSuccessListeners(RowBatchSuccessListener ... listeners) {
        this.requireNotStarted("Must set success listeners before starting job");
        this.successListeners = listeners;
    }

    @Override
    public void setFailureListeners(RowBatchFailureListener ... listeners) {
        this.requireNotStarted("Must set failure listeners before starting job");
        this.failureListeners = listeners;
    }

    private void initRequestEvent(RowBatchEventImpl event) {
        event.withClient(this.getPrimaryClient());
        event.withJobTicket(this.getJobTicket());
    }

    private void notifySuccess(RowBatchSuccessListener.RowBatchResponseEvent<T> event) {
        if (this.successListeners == null || this.successListeners.length == 0) {
            return;
        }
        for (RowBatchSuccessListener successListener : this.successListeners) {
            try {
                successListener.processEvent(event);
            }
            catch (Throwable e) {
                logger.info("error in success listener: {}", (Object)e.toString());
            }
        }
    }

    private void notifyFailure(RowBatchFailureEventImpl event, Throwable throwable) {
        RowBatchFailureListener.BatchFailureDisposition priorDisposition = null;
        int priorMaxRetries = 0;
        block7: for (RowBatchFailureListener failureListener : this.failureListeners) {
            RowBatchFailureListener.BatchFailureDisposition nextDisposition;
            priorDisposition = event.getDisposition();
            priorMaxRetries = event.getMaxRetries();
            try {
                failureListener.processFailure(event, throwable);
            }
            catch (Throwable e) {
                logger.info("error in failure listener: {}", (Object)e.toString());
            }
            int nextMaxRetries = event.getMaxRetries();
            if (priorMaxRetries < nextMaxRetries) {
                event.withMaxRetries(priorMaxRetries);
            }
            if (priorDisposition == (nextDisposition = event.getDisposition())) continue;
            switch (priorDisposition) {
                case SKIP: {
                    continue block7;
                }
                case RETRY: {
                    if (nextDisposition != RowBatchFailureListener.BatchFailureDisposition.SKIP) continue block7;
                    event.withDisposition(priorDisposition);
                    continue block7;
                }
                case STOP: {
                    event.withDisposition(priorDisposition);
                    continue block7;
                }
                default: {
                    throw new MarkLogicInternalException("unknown failure disposition: " + priorDisposition.toString());
                }
            }
        }
    }

    @Override
    public boolean awaitCompletion() {
        try {
            return this.awaitCompletion(Long.MAX_VALUE, TimeUnit.DAYS);
        }
        catch (InterruptedException e) {
            return false;
        }
    }

    @Override
    public boolean awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
        this.requireStarted("Must start job before awaiting completion");
        if (this.threadPool != null) {
            return this.threadPool.awaitTermination(timeout, unit);
        }
        return true;
    }

    @Override
    public long getRowEstimate() {
        if (this.pagedPlan == null) {
            throw new IllegalStateException("Must supply plan before getting the row estimate");
        }
        return this.rowCount;
    }

    @Override
    public long getBatchCount() {
        this.requireStarted("Must start job before getting batch count");
        return this.batchNum.get();
    }

    @Override
    public long getFailedBatches() {
        this.requireStarted("Must start job before getting failed batches");
        return this.failedBatches.get();
    }

    @Override
    public JobTicket getJobTicket() {
        this.requireStarted("Must start job before getting ticket");
        return super.getJobTicket();
    }

    @Override
    public Long getServerTimestamp() {
        long val = this.serverTimestamp.get();
        return val > -1L ? Long.valueOf(val) : null;
    }

    private void requireNotStarted(String msg) {
        if (this.isStarted()) {
            throw new IllegalStateException(msg);
        }
    }

    private void requireStarted(String msg) {
        if (!this.isStarted()) {
            throw new IllegalStateException(msg);
        }
    }

    @Override
    public void stop() {
        if (super.getStopped().get()) {
            return;
        }
        super.getStopped().set(true);
        if (this.threadPool != null) {
            this.threadPool.shutdownNow();
        }
        super.setJobEndTime();
    }

    private void orderlyStop() {
        if (super.getStopped().get()) {
            return;
        }
        super.getStopped().set(true);
        if (this.threadPool != null) {
            this.threadPool.shutdown();
        }
        super.setJobEndTime();
    }

    @Override
    public synchronized void start(JobTicket ticket) {
        this.requireNotStarted("Job already started");
        if (this.pagedPlan == null) {
            throw new IllegalStateException("Plan must be supplied before starting the job");
        }
        if (this.successListeners == null || this.successListeners.length == 0) {
            throw new IllegalStateException("No listener for rows");
        }
        if (this.failureListeners == null || this.failureListeners.length == 0) {
            logger.warn("starting job with default failure listener");
            this.onFailure((batch, throwable) -> logger.warn("batch " + batch.getJobBatchNumber() + " failed with error: " + throwable.getMessage()));
        }
        if (super.getBatchSize() <= 0) {
            logger.warn("batchSize must be 1 or greater--setting batchSize to 1000");
            super.withBatchSize(1000);
        }
        this.batchCount = this.getRowEstimate() / (long)super.getBatchSize() + 1L;
        this.batchSize = Long.divideUnsigned(-1L, this.batchCount);
        if (logger.isDebugEnabled() && this.batchSize > 0L) {
            logger.debug("batch count: {}, calculated batch size: {}", (Object)this.batchCount, (Object)this.batchSize);
        } else {
            logger.info("batch count: {}", (Object)this.batchCount);
        }
        if (this.hostInfos != null && this.getMoveMgr().getConnectionType() == DatabaseClient.ConnectionType.DIRECT) {
            RowManager.RowSetPart datatypeStyle = this.getRowManager().getDatatypeStyle();
            RowManager.RowStructure structureStyle = this.getRowManager().getRowStructureStyle();
            for (HostInfo hostInfo : this.hostInfos) {
                hostInfo.rowMgr.setDatatypeStyle(datatypeStyle);
                hostInfo.rowMgr.setRowStructureStyle(structureStyle);
            }
        }
        this.threadPool = new BatchThreadPoolExecutor(super.getThreadCount());
        this.runningThreads.set(super.getThreadCount());
        super.setJobTicket(ticket);
        super.setJobStartTime();
        super.getStarted().set(true);
        for (int i = 0; i < super.getThreadCount(); ++i) {
            ContentHandle<T> threadHandle = this.rowsHandle.newHandle();
            RowBatchCallable<T> threadCallable = new RowBatchCallable<T>(this, threadHandle);
            if (i == 0 && this.consistentSnapshot) {
                this.readRows(threadCallable);
            }
            this.submit(threadCallable);
        }
    }

    private boolean readRows(RowBatchCallable<T> callable) {
        long currentBatch = this.batchNum.incrementAndGet();
        if (currentBatch > this.batchCount) {
            this.endThread();
            return false;
        }
        long lowerBound = (currentBatch - 1L) * this.batchSize;
        String lowerBoundStr = Long.toUnsignedString(lowerBound);
        String upperBoundStr = Long.toUnsignedString(currentBatch == this.batchCount ? -1L : lowerBound + (this.batchSize - 1L));
        logger.debug("current batch: {}, lower bound: {}, upper bound: {}", new Object[]{currentBatch, lowerBoundStr, upperBoundStr});
        PlanBuilder.Plan plan = this.pagedPlan.bindParam(LOWER_BOUND, lowerBoundStr).bindParam(UPPER_BOUND, upperBoundStr);
        ContentHandle threadHandle = ((RowBatchCallable)callable).getHandle();
        boolean isDirect = this.hostInfos != null && this.getMoveMgr().getConnectionType() == DatabaseClient.ConnectionType.DIRECT;
        RowBatchFailureEventImpl requestEvent = null;
        int batchRetries = 0;
        while (this.shouldRequestBatch(requestEvent, batchRetries)) {
            RowManager requestRowMgr = isDirect ? this.hostInfos[(int)((currentBatch + (long)batchRetries) % (long)this.hostInfos.length)].rowMgr : this.getRowManager();
            Throwable throwable = null;
            Object rowsDoc = null;
            try {
                long snapshotTimestamp;
                BaseHandle baseThreadHandle = (BaseHandle)((Object)threadHandle);
                if (this.consistentSnapshot && baseThreadHandle.getPointInTimeQueryTimestamp() == -1L && (snapshotTimestamp = this.serverTimestamp.get()) > -1L) {
                    logger.info("Initializing thread snapshot timestamp=[{}]", (Object)snapshotTimestamp);
                    baseThreadHandle.setPointInTimeQueryTimestamp(snapshotTimestamp);
                }
                if (requestRowMgr.resultDoc(plan, (StructureReadHandle)((Object)threadHandle)) != null) {
                    rowsDoc = threadHandle.get();
                }
                if (this.consistentSnapshot && this.serverTimestamp.get() == -1L) {
                    snapshotTimestamp = baseThreadHandle.getServerTimestamp();
                    if (this.serverTimestamp.compareAndSet(-1L, snapshotTimestamp)) {
                        logger.info("Established snapshot timestamp=[{}]", (Object)snapshotTimestamp);
                        baseThreadHandle.setPointInTimeQueryTimestamp(snapshotTimestamp);
                    } else {
                        logger.info("Correcting thread snapshot timestamp=[{}]", (Object)snapshotTimestamp);
                        baseThreadHandle.setPointInTimeQueryTimestamp(this.serverTimestamp.get());
                    }
                }
            }
            catch (Throwable e) {
                throwable = e;
            }
            if (throwable != null) {
                logger.debug("failed for batch: {}, retry: {}", (Object)currentBatch, (Object)batchRetries);
                if (requestEvent == null) {
                    requestEvent = new RowBatchFailureEventImpl(currentBatch, lowerBoundStr, upperBoundStr);
                    this.initRequestEvent(requestEvent);
                }
                this.notifyFailure(requestEvent.withBatchRetries(batchRetries).withFailedJobBatches(this.getFailedBatches()), throwable);
            } else if (rowsDoc != null) {
                RowBatchResponseEventImpl responseEvent = new RowBatchResponseEventImpl(currentBatch, lowerBoundStr, upperBoundStr, rowsDoc);
                this.initRequestEvent(responseEvent);
                this.notifySuccess(responseEvent);
                if (requestEvent == null) break;
                requestEvent = null;
                break;
            }
            ++batchRetries;
        }
        if (requestEvent != null) {
            this.failedBatches.incrementAndGet();
        }
        if (requestEvent != null && requestEvent.getDisposition() == RowBatchFailureListener.BatchFailureDisposition.STOP) {
            logger.debug("stopped for failed batch: {}", (Object)currentBatch);
            this.orderlyStop();
        } else {
            logger.debug("finished batch: {}", (Object)currentBatch);
            if (this.batchNum.get() >= this.batchCount) {
                logger.debug("finished thread after batch: {}", (Object)currentBatch);
                this.endThread();
            } else {
                this.submit(callable);
            }
        }
        return requestEvent == null;
    }

    private boolean shouldRequestBatch(RowBatchFailureEventImpl requestEvent, int batchRetries) {
        if (batchRetries == 0) {
            return true;
        }
        if (requestEvent == null) {
            return false;
        }
        if (super.getStopped().get()) {
            return false;
        }
        return requestEvent.getDisposition() == RowBatchFailureListener.BatchFailureDisposition.RETRY && batchRetries < requestEvent.getMaxRetries();
    }

    private void endThread() {
        int stillRunning = this.runningThreads.decrementAndGet();
        if (stillRunning == 0) {
            this.orderlyStop();
        }
    }

    @Override
    public synchronized RowBatcher<T> withForestConfig(ForestConfiguration forestConfig) {
        super.withForestConfig(forestConfig);
        this.hostInfos = this.forestHosts(forestConfig, this.hostInfos);
        return this;
    }

    private void submit(Callable<Boolean> callable) {
        this.submit(new FutureTask<Boolean>(callable));
    }

    private void submit(FutureTask<Boolean> task) {
        this.threadPool.execute(task);
    }

    synchronized HostInfo[] forestHosts(ForestConfiguration forestConfig, HostInfo[] hostInfos) {
        Forest[] forests = this.forests(forestConfig);
        Set<String> hosts = this.hosts(forests);
        HashMap<String, HostInfo> existingHostInfos = new HashMap<String, HostInfo>();
        if (hostInfos != null) {
            for (HostInfo hostInfo : hostInfos) {
                existingHostInfos.put(hostInfo.hostName, hostInfo);
            }
        }
        logger.info("(withForestConfig) Using forests on {} hosts for \"{}\"", hosts, (Object)forests[0].getDatabaseName());
        HostInfo[] newHostInfos = new HostInfo[hosts.size()];
        int i = 0;
        for (String host : hosts) {
            HostInfo existingHost = (HostInfo)existingHostInfos.get(host);
            if (existingHost != null) {
                newHostInfos[i] = existingHost;
            } else {
                newHostInfos[i] = existingHost = new HostInfo();
                existingHost.hostName = host;
                existingHost.client = this.getMoveMgr().getHostClient(host);
                if (this.getMoveMgr().getConnectionType() == DatabaseClient.ConnectionType.DIRECT) {
                    logger.info("Adding DatabaseClient on port {} for host \"{}\" to the rotation", (Object)newHostInfos[i].client.getPort(), (Object)host);
                    existingHost.rowMgr = existingHost.client.newRowManager();
                }
            }
            ++i;
        }
        return newHostInfos;
    }

    private static class HostInfo {
        private String hostName;
        private DatabaseClient client;
        private RowManager rowMgr;

        private HostInfo() {
        }
    }

    private class BatchThreadPoolExecutor
    extends ThreadPoolExecutor {
        BatchThreadPoolExecutor(int threadCount) {
            super(threadCount, threadCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(threadCount), new ThreadPoolExecutor.CallerRunsPolicy());
        }
    }

    private static class RowBatchResponseEventImpl<T>
    extends RowBatchEventImpl
    implements RowBatchSuccessListener.RowBatchResponseEvent<T> {
        private T handle;

        private RowBatchResponseEventImpl(long batchnum, String lowerBound, String upperBound, T handle) {
            super(batchnum, lowerBound, upperBound);
            this.handle = handle;
        }

        @Override
        public T getRowsDoc() {
            return this.handle;
        }
    }

    private static class RowBatchFailureEventImpl
    extends RowBatchEventImpl
    implements RowBatchFailureListener.RowBatchFailureEvent {
        private static final int DEFAULT_MAX_RETRIES = 10;
        private RowBatchFailureListener.BatchFailureDisposition disposition = RowBatchFailureListener.BatchFailureDisposition.SKIP;
        private int maxRetries = 10;
        private int batchRetries = 0;
        private long failedJobBatches = 0L;

        private RowBatchFailureEventImpl(long batchnum, String lowerBound, String upperBound) {
            super(batchnum, lowerBound, upperBound);
        }

        @Override
        public int getBatchRetries() {
            return this.batchRetries;
        }

        private RowBatchFailureEventImpl withBatchRetries(int batchRetries) {
            this.batchRetries = batchRetries;
            return this;
        }

        @Override
        public long getFailedJobBatches() {
            return this.failedJobBatches;
        }

        private RowBatchFailureEventImpl withFailedJobBatches(long failedJobBatches) {
            this.failedJobBatches = failedJobBatches;
            return this;
        }

        @Override
        public RowBatchFailureListener.BatchFailureDisposition getDisposition() {
            return this.disposition;
        }

        @Override
        public RowBatchFailureListener.RowBatchFailureEvent withDisposition(RowBatchFailureListener.BatchFailureDisposition disposition) {
            this.disposition = disposition;
            return this;
        }

        @Override
        public int getMaxRetries() {
            return this.maxRetries;
        }

        @Override
        public RowBatchFailureEventImpl withMaxRetries(int maxRetries) {
            this.maxRetries = maxRetries;
            return this;
        }
    }

    private static class RowBatchEventImpl
    extends BatchEventImpl {
        private String lowerBound = "0";
        private String upperBound = "0";

        private RowBatchEventImpl(long batchnum, String lowerBound, String upperBound) {
            this.lowerBound = lowerBound;
            this.upperBound = upperBound;
            this.withJobBatchNumber(batchnum);
        }

        public String getLowerBound() {
            return this.lowerBound;
        }

        public String getUpperBound() {
            return this.upperBound;
        }
    }

    private static class RowBatchCallable<T>
    implements Callable<Boolean> {
        private RowBatcherImpl rowBatcher;
        private ContentHandle<T> handle;

        RowBatchCallable(RowBatcherImpl<T> rowBatcher, ContentHandle<T> handle) {
            this.rowBatcher = rowBatcher;
            this.handle = handle;
        }

        private ContentHandle<T> getHandle() {
            return this.handle;
        }

        @Override
        public Boolean call() {
            try {
                return this.rowBatcher.readRows(this);
            }
            catch (Throwable e) {
                logger.error("internal error", e);
                return false;
            }
        }
    }
}

