/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kudu.mapreduce;

import com.google.common.base.Objects;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.primitives.UnsignedBytes;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.naming.NamingException;
import org.apache.commons.net.util.Base64;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.net.DNS;
import org.apache.kudu.Common;
import org.apache.kudu.Schema;
import org.apache.kudu.annotations.InterfaceAudience;
import org.apache.kudu.annotations.InterfaceStability;
import org.apache.kudu.client.Bytes;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduScanToken;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.LocatedTablet;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Evolving
public class KuduTableInputFormat
extends InputFormat<NullWritable, RowResult>
implements Configurable {
    private static final Logger LOG = LoggerFactory.getLogger(KuduTableInputFormat.class);
    static final String INPUT_TABLE_KEY = "kudu.mapreduce.input.table";
    static final String SCAN_CACHE_BLOCKS = "kudu.mapreduce.input.scan.cache.blocks";
    static final String MASTER_ADDRESSES_KEY = "kudu.mapreduce.master.address";
    static final String OPERATION_TIMEOUT_MS_KEY = "kudu.mapreduce.operation.timeout.ms";
    static final String NAME_SERVER_KEY = "kudu.mapreduce.name.server";
    static final String ENCODED_PREDICATES_KEY = "kudu.mapreduce.encoded.predicates";
    static final String COLUMN_PROJECTION_KEY = "kudu.mapreduce.column.projection";
    private final Map<String, String> reverseDNSCacheMap = new HashMap<String, String>();
    private Configuration conf;
    private KuduClient client;
    private KuduTable table;
    private long operationTimeoutMs;
    private String nameServer;
    private boolean cacheBlocks;
    private List<String> projectedCols;
    private List<KuduPredicate> predicates;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
        try {
            if (this.table == null) {
                throw new IOException("No table was provided");
            }
            KuduScanToken.KuduScanTokenBuilder tokenBuilder = ((KuduScanToken.KuduScanTokenBuilder)((KuduScanToken.KuduScanTokenBuilder)this.client.newScanTokenBuilder(this.table).setProjectedColumnNames(this.projectedCols)).cacheBlocks(this.cacheBlocks)).setTimeout(this.operationTimeoutMs);
            for (KuduPredicate predicate : this.predicates) {
                tokenBuilder.addPredicate(predicate);
            }
            List tokens = tokenBuilder.build();
            ArrayList<TableSplit> splits = new ArrayList<TableSplit>(tokens.size());
            for (KuduScanToken token : tokens) {
                ArrayList<String> locations = new ArrayList<String>(token.getTablet().getReplicas().size());
                for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
                    locations.add(this.reverseDNS(replica.getRpcHost(), replica.getRpcPort()));
                }
                splits.add(new TableSplit(token, locations.toArray(new String[locations.size()])));
            }
            ArrayList<TableSplit> arrayList = splits;
            return arrayList;
        }
        finally {
            this.shutdownClient();
        }
    }

    private void shutdownClient() throws IOException {
        try {
            this.client.shutdown();
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    private String reverseDNS(String host, Integer port) {
        String location = this.reverseDNSCacheMap.get(host);
        if (location != null) {
            return location;
        }
        InetSocketAddress isa = new InetSocketAddress(host, (int)port);
        if (isa.isUnresolved()) {
            LOG.warn("Failed address resolve for: " + isa);
        }
        InetAddress tabletInetAddress = isa.getAddress();
        try {
            location = KuduTableInputFormat.domainNamePointerToHostName(DNS.reverseDns((InetAddress)tabletInetAddress, (String)this.nameServer));
            this.reverseDNSCacheMap.put(host, location);
        }
        catch (NamingException e) {
            LOG.warn("Cannot resolve the host name for " + tabletInetAddress + " because of " + e);
            location = host;
        }
        return location;
    }

    public RecordReader<NullWritable, RowResult> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new TableRecordReader();
    }

    public void setConf(Configuration entries) {
        this.conf = new Configuration(entries);
        String tableName = this.conf.get(INPUT_TABLE_KEY);
        String masterAddresses = this.conf.get(MASTER_ADDRESSES_KEY);
        this.operationTimeoutMs = this.conf.getLong(OPERATION_TIMEOUT_MS_KEY, 30000L);
        this.client = new KuduClient.KuduClientBuilder(masterAddresses).defaultOperationTimeoutMs(this.operationTimeoutMs).build();
        this.nameServer = this.conf.get(NAME_SERVER_KEY);
        this.cacheBlocks = this.conf.getBoolean(SCAN_CACHE_BLOCKS, false);
        try {
            this.table = this.client.openTable(tableName);
        }
        catch (Exception ex) {
            throw new RuntimeException("Could not obtain the table from the master, is the master running and is this table created? tablename=" + tableName + " and master address= " + masterAddresses, ex);
        }
        String projectionConfig = this.conf.get(COLUMN_PROJECTION_KEY);
        if (projectionConfig == null || projectionConfig.equals("*")) {
            this.projectedCols = null;
        } else if ("".equals(projectionConfig)) {
            this.projectedCols = new ArrayList<String>();
        } else {
            this.projectedCols = Lists.newArrayList((Iterable)Splitter.on((char)',').split((CharSequence)projectionConfig));
            Schema tableSchema = this.table.getSchema();
            for (String columnName : this.projectedCols) {
                if (tableSchema.getColumn(columnName) != null) continue;
                throw new IllegalArgumentException("Unknown column " + columnName);
            }
        }
        this.predicates = new ArrayList<KuduPredicate>();
        try {
            ByteArrayInputStream is = new ByteArrayInputStream(Base64.decodeBase64((String)this.conf.get(ENCODED_PREDICATES_KEY, "")));
            while (((InputStream)is).available() > 0) {
                this.predicates.add(KuduPredicate.fromPB((Schema)this.table.getSchema(), (Common.ColumnPredicatePB)Common.ColumnPredicatePB.parseDelimitedFrom((InputStream)is)));
            }
        }
        catch (IOException e) {
            throw new RuntimeException("unable to deserialize predicates from the configuration", e);
        }
    }

    private static String domainNamePointerToHostName(String dnPtr) {
        if (dnPtr == null) {
            return null;
        }
        return dnPtr.endsWith(".") ? dnPtr.substring(0, dnPtr.length() - 1) : dnPtr;
    }

    public Configuration getConf() {
        return this.conf;
    }

    class TableRecordReader
    extends RecordReader<NullWritable, RowResult> {
        private final NullWritable currentKey = NullWritable.get();
        private RowResult currentValue;
        private RowResultIterator iterator;
        private KuduScanner scanner;
        private TableSplit split;

        TableRecordReader() {
        }

        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            if (!(inputSplit instanceof TableSplit)) {
                throw new IllegalArgumentException("TableSplit is the only accepted input split");
            }
            this.split = (TableSplit)inputSplit;
            LOG.debug("Creating scanner for token: {}", (Object)KuduScanToken.stringifySerializedToken((byte[])this.split.getScanToken(), (KuduClient)KuduTableInputFormat.this.client));
            this.scanner = KuduScanToken.deserializeIntoScanner((byte[])this.split.getScanToken(), (KuduClient)KuduTableInputFormat.this.client);
            this.tryRefreshIterator();
        }

        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!this.iterator.hasNext()) {
                this.tryRefreshIterator();
                if (!this.iterator.hasNext()) {
                    return false;
                }
            }
            this.currentValue = this.iterator.next();
            return true;
        }

        private void tryRefreshIterator() throws IOException {
            if (!this.scanner.hasMoreRows()) {
                return;
            }
            try {
                this.iterator = this.scanner.nextRows();
            }
            catch (Exception e) {
                throw new IOException("Couldn't get scan data", e);
            }
        }

        public NullWritable getCurrentKey() throws IOException, InterruptedException {
            return this.currentKey;
        }

        public RowResult getCurrentValue() throws IOException, InterruptedException {
            return this.currentValue;
        }

        public float getProgress() throws IOException, InterruptedException {
            return 0.0f;
        }

        public void close() throws IOException {
            try {
                this.scanner.close();
            }
            catch (Exception e) {
                throw new IOException(e);
            }
            KuduTableInputFormat.this.shutdownClient();
        }
    }

    static class TableSplit
    extends InputSplit
    implements Writable,
    Comparable<TableSplit> {
        private byte[] scanToken;
        private byte[] partitionKey;
        private String[] locations;

        public TableSplit() {
        }

        public TableSplit(KuduScanToken token, String[] locations) throws IOException {
            this.scanToken = token.serialize();
            this.partitionKey = token.getTablet().getPartition().getPartitionKeyStart();
            this.locations = locations;
        }

        public byte[] getScanToken() {
            return this.scanToken;
        }

        public byte[] getPartitionKey() {
            return this.partitionKey;
        }

        public long getLength() throws IOException, InterruptedException {
            return 0L;
        }

        public String[] getLocations() throws IOException, InterruptedException {
            return this.locations;
        }

        @Override
        public int compareTo(TableSplit other) {
            return UnsignedBytes.lexicographicalComparator().compare(this.partitionKey, other.partitionKey);
        }

        public void write(DataOutput dataOutput) throws IOException {
            Bytes.writeByteArray((DataOutput)dataOutput, (byte[])this.scanToken);
            Bytes.writeByteArray((DataOutput)dataOutput, (byte[])this.partitionKey);
            dataOutput.writeInt(this.locations.length);
            for (String location : this.locations) {
                byte[] str = Bytes.fromString((String)location);
                Bytes.writeByteArray((DataOutput)dataOutput, (byte[])str);
            }
        }

        public void readFields(DataInput dataInput) throws IOException {
            this.scanToken = Bytes.readByteArray((DataInput)dataInput);
            this.partitionKey = Bytes.readByteArray((DataInput)dataInput);
            this.locations = new String[dataInput.readInt()];
            for (int i = 0; i < this.locations.length; ++i) {
                byte[] str = Bytes.readByteArray((DataInput)dataInput);
                this.locations[i] = Bytes.getString((byte[])str);
            }
        }

        public int hashCode() {
            return Arrays.hashCode(this.partitionKey);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TableSplit that = (TableSplit)o;
            return this.compareTo(that) == 0;
        }

        public String toString() {
            return Objects.toStringHelper((Object)this).add("partitionKey", (Object)Bytes.pretty((byte[])this.partitionKey)).add("locations", (Object)Arrays.toString(this.locations)).toString();
        }
    }
}

