/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.mapreduce;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HashTable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class SyncTable
extends Configured
implements Tool {
    private static final Logger LOG = LoggerFactory.getLogger(SyncTable.class);
    static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
    static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
    static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
    static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
    static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
    static final String DRY_RUN_CONF_KEY = "sync.table.dry.run";
    static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes";
    static final String DO_PUTS_CONF_KEY = "sync.table.do.puts";
    Path sourceHashDir;
    String sourceTableName;
    String targetTableName;
    String sourceZkCluster;
    String targetZkCluster;
    boolean dryRun;
    boolean doDeletes = true;
    boolean doPuts = true;
    Counters counters;
    private static final int NUM_ARGS = 3;

    public SyncTable(Configuration conf) {
        super(conf);
    }

    private void initCredentialsForHBase(String zookeeper, Job job) throws IOException {
        Configuration peerConf = HBaseConfiguration.createClusterConf((Configuration)job.getConfiguration(), (String)zookeeper);
        if (peerConf.get("hbase.security.authentication").equals("kerberos")) {
            TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
        }
    }

    public Job createSubmittableJob(String[] args) throws IOException {
        FileSystem fs = this.sourceHashDir.getFileSystem(this.getConf());
        if (!fs.exists(this.sourceHashDir)) {
            throw new IOException("Source hash dir not found: " + this.sourceHashDir);
        }
        Job job = Job.getInstance((Configuration)this.getConf(), (String)this.getConf().get("mapreduce.job.name", "syncTable_" + this.sourceTableName + "-" + this.targetTableName));
        Configuration jobConf = job.getConfiguration();
        if (jobConf.get("hadoop.security.authentication").equals("kerberos")) {
            TokenCache.obtainTokensForNamenodes((Credentials)job.getCredentials(), (Path[])new Path[]{this.sourceHashDir}, (Configuration)this.getConf());
        }
        HashTable.TableHash tableHash = HashTable.TableHash.read(this.getConf(), this.sourceHashDir);
        LOG.info("Read source hash manifest: " + tableHash);
        LOG.info("Read " + tableHash.partitions.size() + " partition keys");
        if (!tableHash.tableName.equals(this.sourceTableName)) {
            LOG.warn("Table name mismatch - manifest indicates hash was taken from: " + tableHash.tableName + " but job is reading from: " + this.sourceTableName);
        }
        if (tableHash.numHashFiles != tableHash.partitions.size() + 1) {
            throw new RuntimeException("Hash data appears corrupt. The number of of hash files created should be 1 more than the number of partition keys.  However, the manifest file  says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys found in the partitions file is " + tableHash.partitions.size());
        }
        Path dataDir = new Path(this.sourceHashDir, "hashes");
        int dataSubdirCount = 0;
        for (FileStatus file : fs.listStatus(dataDir)) {
            if (!file.getPath().getName().startsWith("part-r-")) continue;
            ++dataSubdirCount;
        }
        if (dataSubdirCount != tableHash.numHashFiles) {
            throw new RuntimeException("Hash data appears corrupt. The number of of hash files created should be 1 more than the number of partition keys.  However, the number of data dirs found is " + dataSubdirCount + " but the number of partition keys found in the partitions file is " + tableHash.partitions.size());
        }
        job.setJarByClass(HashTable.class);
        jobConf.set(SOURCE_HASH_DIR_CONF_KEY, this.sourceHashDir.toString());
        jobConf.set(SOURCE_TABLE_CONF_KEY, this.sourceTableName);
        jobConf.set(TARGET_TABLE_CONF_KEY, this.targetTableName);
        if (this.sourceZkCluster != null) {
            jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, this.sourceZkCluster);
            this.initCredentialsForHBase(this.sourceZkCluster, job);
        }
        if (this.targetZkCluster != null) {
            jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, this.targetZkCluster);
            this.initCredentialsForHBase(this.targetZkCluster, job);
        }
        jobConf.setBoolean(DRY_RUN_CONF_KEY, this.dryRun);
        jobConf.setBoolean(DO_DELETES_CONF_KEY, this.doDeletes);
        jobConf.setBoolean(DO_PUTS_CONF_KEY, this.doPuts);
        TableMapReduceUtil.initTableMapperJob(this.targetTableName, tableHash.initScan(), SyncMapper.class, null, null, job);
        job.setNumReduceTasks(0);
        if (this.dryRun) {
            job.setOutputFormatClass(NullOutputFormat.class);
        } else {
            TableMapReduceUtil.initTableReducerJob(this.targetTableName, null, job, null, this.targetZkCluster, null, null);
        }
        if (this.sourceZkCluster != null) {
            Configuration peerConf = HBaseConfiguration.createClusterConf((Configuration)job.getConfiguration(), (String)this.sourceZkCluster);
            TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
        }
        return job;
    }

    private static void printUsage(String errorMsg) {
        if (errorMsg != null && errorMsg.length() > 0) {
            System.err.println("ERROR: " + errorMsg);
            System.err.println();
        }
        System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>");
        System.err.println();
        System.err.println("Options:");
        System.err.println(" sourcezkcluster  ZK cluster key of the source table");
        System.err.println("                  (defaults to cluster in classpath's config)");
        System.err.println(" targetzkcluster  ZK cluster key of the target table");
        System.err.println("                  (defaults to cluster in classpath's config)");
        System.err.println(" dryrun           if true, output counters but no writes");
        System.err.println("                  (defaults to false)");
        System.err.println(" doDeletes        if false, does not perform deletes");
        System.err.println("                  (defaults to true)");
        System.err.println(" doPuts           if false, does not perform puts ");
        System.err.println("                  (defaults to true)");
        System.err.println();
        System.err.println("Args:");
        System.err.println(" sourcehashdir    path to HashTable output dir for source table");
        System.err.println("                  (see org.apache.hadoop.hbase.mapreduce.HashTable)");
        System.err.println(" sourcetable      Name of the source table to sync from");
        System.err.println(" targettable      Name of the target table to sync to");
        System.err.println();
        System.err.println("Examples:");
        System.err.println(" For a dry run SyncTable of tableA from a remote source cluster");
        System.err.println(" to a local target cluster:");
        System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase hdfs://nn:9000/hashes/tableA tableA tableA");
    }

    private boolean doCommandLine(String[] args) {
        if (args.length < 3) {
            SyncTable.printUsage(null);
            return false;
        }
        try {
            this.sourceHashDir = new Path(args[args.length - 3]);
            this.sourceTableName = args[args.length - 2];
            this.targetTableName = args[args.length - 1];
            for (int i = 0; i < args.length - 3; ++i) {
                String cmd = args[i];
                if (cmd.equals("-h") || cmd.startsWith("--h")) {
                    SyncTable.printUsage(null);
                    return false;
                }
                String sourceZkClusterKey = "--sourcezkcluster=";
                if (cmd.startsWith("--sourcezkcluster=")) {
                    this.sourceZkCluster = cmd.substring("--sourcezkcluster=".length());
                    continue;
                }
                String targetZkClusterKey = "--targetzkcluster=";
                if (cmd.startsWith("--targetzkcluster=")) {
                    this.targetZkCluster = cmd.substring("--targetzkcluster=".length());
                    continue;
                }
                String dryRunKey = "--dryrun=";
                if (cmd.startsWith("--dryrun=")) {
                    this.dryRun = Boolean.parseBoolean(cmd.substring("--dryrun=".length()));
                    continue;
                }
                String doDeletesKey = "--doDeletes=";
                if (cmd.startsWith("--doDeletes=")) {
                    this.doDeletes = Boolean.parseBoolean(cmd.substring("--doDeletes=".length()));
                    continue;
                }
                String doPutsKey = "--doPuts=";
                if (cmd.startsWith("--doPuts=")) {
                    this.doPuts = Boolean.parseBoolean(cmd.substring("--doPuts=".length()));
                    continue;
                }
                SyncTable.printUsage("Invalid argument '" + cmd + "'");
                return false;
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            SyncTable.printUsage("Can't start because " + e.getMessage());
            return false;
        }
        return true;
    }

    public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run((Tool)new SyncTable(HBaseConfiguration.create()), (String[])args);
        System.exit(ret);
    }

    public int run(String[] args) throws Exception {
        String[] otherArgs = new GenericOptionsParser(this.getConf(), args).getRemainingArgs();
        if (!this.doCommandLine(otherArgs)) {
            return 1;
        }
        Job job = this.createSubmittableJob(otherArgs);
        if (!job.waitForCompletion(true)) {
            LOG.info("Map-reduce job failed!");
            return 1;
        }
        this.counters = job.getCounters();
        return 0;
    }

    public static class SyncMapper
    extends TableMapper<ImmutableBytesWritable, Mutation> {
        Path sourceHashDir;
        Connection sourceConnection;
        Connection targetConnection;
        Table sourceTable;
        Table targetTable;
        boolean dryRun;
        boolean doDeletes = true;
        boolean doPuts = true;
        HashTable.TableHash sourceTableHash;
        HashTable.TableHash.Reader sourceHashReader;
        ImmutableBytesWritable currentSourceHash;
        ImmutableBytesWritable nextSourceKey;
        HashTable.ResultHasher targetHasher;
        Throwable mapperException;
        private static final CellScanner EMPTY_CELL_SCANNER = new CellScanner(Collections.emptyIterator());

        protected void setup(Mapper.Context context) throws IOException {
            Configuration conf = context.getConfiguration();
            this.sourceHashDir = new Path(conf.get(SyncTable.SOURCE_HASH_DIR_CONF_KEY));
            this.sourceConnection = SyncMapper.openConnection(conf, SyncTable.SOURCE_ZK_CLUSTER_CONF_KEY, null);
            this.targetConnection = SyncMapper.openConnection(conf, SyncTable.TARGET_ZK_CLUSTER_CONF_KEY, "hbase.mapred.output.");
            this.sourceTable = SyncMapper.openTable(this.sourceConnection, conf, SyncTable.SOURCE_TABLE_CONF_KEY);
            this.targetTable = SyncMapper.openTable(this.targetConnection, conf, SyncTable.TARGET_TABLE_CONF_KEY);
            this.dryRun = conf.getBoolean(SyncTable.DRY_RUN_CONF_KEY, false);
            this.doDeletes = conf.getBoolean(SyncTable.DO_DELETES_CONF_KEY, true);
            this.doPuts = conf.getBoolean(SyncTable.DO_PUTS_CONF_KEY, true);
            this.sourceTableHash = HashTable.TableHash.read(conf, this.sourceHashDir);
            LOG.info("Read source hash manifest: " + this.sourceTableHash);
            LOG.info("Read " + this.sourceTableHash.partitions.size() + " partition keys");
            TableSplit split = (TableSplit)context.getInputSplit();
            ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow());
            this.sourceHashReader = this.sourceTableHash.newReader(conf, splitStartKey);
            this.findNextKeyHashPair();
            this.targetHasher = new HashTable.ResultHasher();
        }

        private static Connection openConnection(Configuration conf, String zkClusterConfKey, String configPrefix) throws IOException {
            String zkCluster = conf.get(zkClusterConfKey);
            Configuration clusterConf = HBaseConfiguration.createClusterConf((Configuration)conf, (String)zkCluster, (String)configPrefix);
            return ConnectionFactory.createConnection((Configuration)clusterConf);
        }

        private static Table openTable(Connection connection, Configuration conf, String tableNameConfKey) throws IOException {
            return connection.getTable(TableName.valueOf((String)conf.get(tableNameConfKey)));
        }

        private void findNextKeyHashPair() throws IOException {
            boolean hasNext = this.sourceHashReader.next();
            this.nextSourceKey = hasNext ? this.sourceHashReader.getCurrentKey() : null;
        }

        protected void map(ImmutableBytesWritable key, Result value, Mapper.Context context) throws IOException, InterruptedException {
            try {
                while (this.nextSourceKey != null && key.compareTo(this.nextSourceKey) >= 0) {
                    this.moveToNextBatch(context);
                }
                if (this.targetHasher.isBatchStarted()) {
                    this.targetHasher.hashResult(value);
                }
            }
            catch (Throwable t) {
                this.mapperException = t;
                Throwables.propagateIfInstanceOf((Throwable)t, IOException.class);
                Throwables.propagateIfInstanceOf((Throwable)t, InterruptedException.class);
                Throwables.propagate((Throwable)t);
            }
        }

        private void moveToNextBatch(Mapper.Context context) throws IOException, InterruptedException {
            if (this.targetHasher.isBatchStarted()) {
                this.finishBatchAndCompareHashes(context);
            }
            this.targetHasher.startBatch(this.nextSourceKey);
            this.currentSourceHash = this.sourceHashReader.getCurrentHash();
            this.findNextKeyHashPair();
        }

        private void finishBatchAndCompareHashes(Mapper.Context context) throws IOException, InterruptedException {
            ImmutableBytesWritable targetHash;
            this.targetHasher.finishBatch();
            context.getCounter((Enum)Counter.BATCHES).increment(1L);
            if (this.targetHasher.getBatchSize() == 0L) {
                context.getCounter((Enum)Counter.EMPTY_BATCHES).increment(1L);
            }
            if ((targetHash = this.targetHasher.getBatchHash()).equals((Object)this.currentSourceHash)) {
                context.getCounter((Enum)Counter.HASHES_MATCHED).increment(1L);
            } else {
                ImmutableBytesWritable stopRow;
                context.getCounter((Enum)Counter.HASHES_NOT_MATCHED).increment(1L);
                ImmutableBytesWritable immutableBytesWritable = stopRow = this.nextSourceKey == null ? new ImmutableBytesWritable(this.sourceTableHash.stopRow) : this.nextSourceKey;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Hash mismatch.  Key range: " + SyncMapper.toHex(this.targetHasher.getBatchStartKey()) + " to " + SyncMapper.toHex(stopRow) + " sourceHash: " + SyncMapper.toHex(this.currentSourceHash) + " targetHash: " + SyncMapper.toHex(targetHash));
                }
                this.syncRange(context, this.targetHasher.getBatchStartKey(), stopRow);
            }
        }

        private static String toHex(ImmutableBytesWritable bytes) {
            return Bytes.toHex((byte[])bytes.get(), (int)bytes.getOffset(), (int)bytes.getLength());
        }

        private void syncRange(Mapper.Context context, ImmutableBytesWritable startRow, ImmutableBytesWritable stopRow) throws IOException, InterruptedException {
            Scan scan = this.sourceTableHash.initScan();
            scan.setStartRow(startRow.copyBytes());
            scan.setStopRow(stopRow.copyBytes());
            ResultScanner sourceScanner = this.sourceTable.getScanner(scan);
            CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
            ResultScanner targetScanner = this.targetTable.getScanner(new Scan(scan));
            CellScanner targetCells = new CellScanner(targetScanner.iterator());
            boolean rangeMatched = true;
            byte[] nextSourceRow = sourceCells.nextRow();
            byte[] nextTargetRow = targetCells.nextRow();
            while (nextSourceRow != null || nextTargetRow != null) {
                boolean rowMatched;
                int rowComparison = SyncMapper.compareRowKeys(nextSourceRow, nextTargetRow);
                if (rowComparison < 0) {
                    if (LOG.isInfoEnabled()) {
                        LOG.info("Target missing row: " + Bytes.toHex((byte[])nextSourceRow));
                    }
                    context.getCounter((Enum)Counter.TARGETMISSINGROWS).increment(1L);
                    rowMatched = this.syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER);
                    nextSourceRow = sourceCells.nextRow();
                } else if (rowComparison > 0) {
                    if (LOG.isInfoEnabled()) {
                        LOG.info("Source missing row: " + Bytes.toHex((byte[])nextTargetRow));
                    }
                    context.getCounter((Enum)Counter.SOURCEMISSINGROWS).increment(1L);
                    rowMatched = this.syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells);
                    nextTargetRow = targetCells.nextRow();
                } else {
                    rowMatched = this.syncRowCells(context, nextSourceRow, sourceCells, targetCells);
                    nextSourceRow = sourceCells.nextRow();
                    nextTargetRow = targetCells.nextRow();
                }
                if (rowMatched) continue;
                rangeMatched = false;
            }
            sourceScanner.close();
            targetScanner.close();
            context.getCounter((Enum)(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED)).increment(1L);
        }

        private boolean syncRowCells(Mapper.Context context, byte[] rowKey, CellScanner sourceCells, CellScanner targetCells) throws IOException, InterruptedException {
            Put put = null;
            Delete delete = null;
            long matchingCells = 0L;
            boolean matchingRow = true;
            Cell sourceCell = sourceCells.nextCellInRow();
            Cell targetCell = targetCells.nextCellInRow();
            while (sourceCell != null || targetCell != null) {
                int cellKeyComparison = SyncMapper.compareCellKeysWithinRow(sourceCell, targetCell);
                if (cellKeyComparison < 0) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Target missing cell: " + sourceCell);
                    }
                    context.getCounter((Enum)Counter.TARGETMISSINGCELLS).increment(1L);
                    matchingRow = false;
                    if (!this.dryRun && this.doPuts) {
                        if (put == null) {
                            put = new Put(rowKey);
                        }
                        put.add(sourceCell);
                    }
                    sourceCell = sourceCells.nextCellInRow();
                } else if (cellKeyComparison > 0) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Source missing cell: " + targetCell);
                    }
                    context.getCounter((Enum)Counter.SOURCEMISSINGCELLS).increment(1L);
                    matchingRow = false;
                    if (!this.dryRun && this.doDeletes) {
                        if (delete == null) {
                            delete = new Delete(rowKey);
                        }
                        delete.addColumn(CellUtil.cloneFamily((Cell)targetCell), CellUtil.cloneQualifier((Cell)targetCell), targetCell.getTimestamp());
                    }
                    targetCell = targetCells.nextCellInRow();
                } else {
                    if (CellUtil.matchingValue((Cell)sourceCell, (Cell)targetCell)) {
                        ++matchingCells;
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Different values: ");
                            LOG.debug("  source cell: " + sourceCell + " value: " + Bytes.toHex((byte[])sourceCell.getValueArray(), (int)sourceCell.getValueOffset(), (int)sourceCell.getValueLength()));
                            LOG.debug("  target cell: " + targetCell + " value: " + Bytes.toHex((byte[])targetCell.getValueArray(), (int)targetCell.getValueOffset(), (int)targetCell.getValueLength()));
                        }
                        context.getCounter((Enum)Counter.DIFFERENTCELLVALUES).increment(1L);
                        matchingRow = false;
                        if (!this.dryRun && this.doPuts) {
                            if (put == null) {
                                put = new Put(rowKey);
                            }
                            put.add(sourceCell);
                        }
                    }
                    sourceCell = sourceCells.nextCellInRow();
                    targetCell = targetCells.nextCellInRow();
                }
                if (this.dryRun || this.sourceTableHash.scanBatch <= 0) continue;
                if (put != null && put.size() >= this.sourceTableHash.scanBatch) {
                    context.write((Object)new ImmutableBytesWritable(rowKey), (Object)put);
                    put = null;
                }
                if (delete == null || delete.size() < this.sourceTableHash.scanBatch) continue;
                context.write((Object)new ImmutableBytesWritable(rowKey), (Object)delete);
                delete = null;
            }
            if (!this.dryRun) {
                if (put != null) {
                    context.write((Object)new ImmutableBytesWritable(rowKey), put);
                }
                if (delete != null) {
                    context.write((Object)new ImmutableBytesWritable(rowKey), delete);
                }
            }
            if (matchingCells > 0L) {
                context.getCounter((Enum)Counter.MATCHINGCELLS).increment(matchingCells);
            }
            if (matchingRow) {
                context.getCounter((Enum)Counter.MATCHINGROWS).increment(1L);
                return true;
            }
            context.getCounter((Enum)Counter.ROWSWITHDIFFS).increment(1L);
            return false;
        }

        private static int compareRowKeys(byte[] r1, byte[] r2) {
            if (r1 == null) {
                return 1;
            }
            if (r2 == null) {
                return -1;
            }
            return Bytes.compareTo((byte[])r1, (int)0, (int)r1.length, (byte[])r2, (int)0, (int)r2.length);
        }

        private static int compareCellKeysWithinRow(Cell c1, Cell c2) {
            if (c1 == null) {
                return 1;
            }
            if (c2 == null) {
                return -1;
            }
            int result = CellComparator.getInstance().compareFamilies(c1, c2);
            if (result != 0) {
                return result;
            }
            result = CellComparator.getInstance().compareQualifiers(c1, c2);
            if (result != 0) {
                return result;
            }
            return CellComparator.getInstance().compareTimestamps(c1, c2);
        }

        protected void cleanup(Mapper.Context context) throws IOException, InterruptedException {
            if (this.mapperException == null) {
                try {
                    this.finishRemainingHashRanges(context);
                }
                catch (Throwable t) {
                    this.mapperException = t;
                }
            }
            try {
                this.sourceTable.close();
                this.targetTable.close();
                this.sourceConnection.close();
                this.targetConnection.close();
            }
            catch (Throwable t) {
                if (this.mapperException == null) {
                    this.mapperException = t;
                }
                LOG.error("Suppressing exception from closing tables", t);
            }
            if (this.mapperException != null) {
                Throwables.propagateIfInstanceOf((Throwable)this.mapperException, IOException.class);
                Throwables.propagateIfInstanceOf((Throwable)this.mapperException, InterruptedException.class);
                Throwables.propagate((Throwable)this.mapperException);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void finishRemainingHashRanges(Mapper.Context context) throws IOException, InterruptedException {
            TableSplit split = (TableSplit)context.getInputSplit();
            byte[] splitEndRow = split.getEndRow();
            boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow);
            while (this.nextSourceKey != null && (this.nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) {
                this.moveToNextBatch(context);
            }
            if (this.targetHasher.isBatchStarted()) {
                if (this.nextSourceKey != null && this.nextSourceKey.compareTo(splitEndRow) > 0 || this.nextSourceKey == null && !Bytes.equals((byte[])splitEndRow, (byte[])this.sourceTableHash.stopRow)) {
                    Scan scan = this.sourceTableHash.initScan();
                    scan.setStartRow(splitEndRow);
                    if (this.nextSourceKey == null) {
                        scan.setStopRow(this.sourceTableHash.stopRow);
                    } else {
                        scan.setStopRow(this.nextSourceKey.copyBytes());
                    }
                    try (ResultScanner targetScanner = null;){
                        targetScanner = this.targetTable.getScanner(scan);
                        for (Result row : targetScanner) {
                            this.targetHasher.hashResult(row);
                        }
                    }
                }
                this.finishBatchAndCompareHashes(context);
            }
        }

        private static class CellScanner {
            private final Iterator<Result> results;
            private byte[] currentRow;
            private Result currentRowResult;
            private int nextCellInRow;
            private Result nextRowResult;

            public CellScanner(Iterator<Result> results) {
                this.results = results;
            }

            public byte[] nextRow() {
                if (this.nextRowResult == null) {
                    while (this.results.hasNext()) {
                        this.nextRowResult = this.results.next();
                        Cell nextCell = this.nextRowResult.rawCells()[0];
                        if (this.currentRow == null || !Bytes.equals((byte[])this.currentRow, (int)0, (int)this.currentRow.length, (byte[])nextCell.getRowArray(), (int)nextCell.getRowOffset(), (int)nextCell.getRowLength())) break;
                        this.nextRowResult = null;
                    }
                    if (this.nextRowResult == null) {
                        this.currentRowResult = null;
                        this.currentRow = null;
                        return null;
                    }
                }
                this.currentRowResult = this.nextRowResult;
                this.nextCellInRow = 0;
                this.currentRow = this.currentRowResult.getRow();
                this.nextRowResult = null;
                return this.currentRow;
            }

            public Cell nextCellInRow() {
                if (this.currentRowResult == null) {
                    return null;
                }
                Cell nextCell = this.currentRowResult.rawCells()[this.nextCellInRow];
                ++this.nextCellInRow;
                if (this.nextCellInRow == this.currentRowResult.size()) {
                    if (this.results.hasNext()) {
                        Result result = this.results.next();
                        Cell cell = result.rawCells()[0];
                        if (Bytes.equals((byte[])this.currentRow, (int)0, (int)this.currentRow.length, (byte[])cell.getRowArray(), (int)cell.getRowOffset(), (int)cell.getRowLength())) {
                            this.currentRowResult = result;
                            this.nextCellInRow = 0;
                        } else {
                            this.nextRowResult = result;
                            this.currentRowResult = null;
                        }
                    } else {
                        this.currentRowResult = null;
                    }
                }
                return nextCell;
            }
        }

        public static enum Counter {
            BATCHES,
            HASHES_MATCHED,
            HASHES_NOT_MATCHED,
            SOURCEMISSINGROWS,
            SOURCEMISSINGCELLS,
            TARGETMISSINGROWS,
            TARGETMISSINGCELLS,
            ROWSWITHDIFFS,
            DIFFERENTCELLVALUES,
            MATCHINGROWS,
            MATCHINGCELLS,
            EMPTY_BATCHES,
            RANGESMATCHED,
            RANGESNOTMATCHED;

        }
    }
}

