/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.hadoop.util;

import cascading.flow.FlowException;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.flow.hadoop.util.JavaObjectSerializer;
import cascading.flow.hadoop.util.ObjectSerializer;
import cascading.flow.planner.PlatformInfo;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.hadoop.Hfs;
import cascading.tap.hadoop.Lfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import cascading.util.Util;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.jar.Attributes;
import java.util.jar.Manifest;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HadoopUtil {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopUtil.class);
    private static final String ENCODING = "US-ASCII";
    private static final Class<?> DEFAULT_OBJECT_SERIALIZER = JavaObjectSerializer.class;
    private static PlatformInfo platformInfo;

    public static void initLog4j(JobConf jobConf) {
        String[] elements;
        String values = jobConf.get("log4j.logger", null);
        if (values == null || values.length() == 0) {
            return;
        }
        if (!Util.hasClass((String)"org.apache.log4j.Logger")) {
            LOG.info("org.apache.log4j.Logger is not in the current CLASSPATH, not setting log4j.logger properties");
            return;
        }
        for (String element : elements = values.split(",")) {
            HadoopUtil.setLogLevel(element.split("="));
        }
    }

    private static void setLogLevel(String[] logger) {
        Object loggerObject = Util.invokeStaticMethod((String)"org.apache.log4j.Logger", (String)"getLogger", (Object[])new Object[]{logger[0]}, (Class[])new Class[]{String.class});
        Object levelObject = Util.invokeStaticMethod((String)"org.apache.log4j.Level", (String)"toLevel", (Object[])new Object[]{logger[1]}, (Class[])new Class[]{String.class});
        Util.invokeInstanceMethod((Object)loggerObject, (String)"setLevel", (Object[])new Object[]{levelObject}, (Class[])new Class[]{levelObject.getClass()});
    }

    public static JobConf createJobConf(Map<Object, Object> properties, JobConf defaultJobconf) {
        JobConf jobConf;
        JobConf jobConf2 = jobConf = defaultJobconf == null ? new JobConf() : new JobConf((Configuration)defaultJobconf);
        if (properties == null) {
            return jobConf;
        }
        HashSet<Object> keys = new HashSet<Object>(properties.keySet());
        if (properties instanceof Properties) {
            keys.addAll(((Properties)properties).stringPropertyNames());
        }
        for (Object e : keys) {
            Object value = properties.get(e);
            if (value == null && properties instanceof Properties && e instanceof String) {
                value = ((Properties)properties).getProperty((String)e);
            }
            if (value == null || value instanceof Class || value instanceof JobConf) continue;
            jobConf.set(e.toString(), value.toString());
        }
        return jobConf;
    }

    public static Map<Object, Object> createProperties(Configuration jobConf) {
        HashMap<Object, Object> properties = new HashMap<Object, Object>();
        for (Map.Entry entry : jobConf) {
            properties.put(entry.getKey(), entry.getValue());
        }
        return properties;
    }

    public static Thread getHDFSShutdownHook() {
        Exception caughtException = null;
        try {
            FileSystem.getLocal((Configuration)new JobConf());
            Field field = FileSystem.class.getDeclaredField("clientFinalizer");
            field.setAccessible(true);
            Thread finalizer = (Thread)field.get(null);
            if (finalizer != null) {
                Runtime.getRuntime().removeShutdownHook(finalizer);
            }
            return finalizer;
        }
        catch (NoSuchFieldException exception) {
            caughtException = exception;
        }
        catch (IllegalAccessException exception) {
            caughtException = exception;
        }
        catch (IOException exception) {
            caughtException = exception;
        }
        LOG.debug("unable to find and remove client hdfs shutdown hook, received exception: {}", (Object)caughtException.getClass().getName());
        return null;
    }

    public static String encodeBytes(byte[] bytes) {
        try {
            return new String(Base64.encodeBase64((byte[])bytes), ENCODING);
        }
        catch (UnsupportedEncodingException exception) {
            throw new RuntimeException(exception);
        }
    }

    public static byte[] decodeBytes(String string) {
        try {
            byte[] bytes = string.getBytes(ENCODING);
            return Base64.decodeBase64((byte[])bytes);
        }
        catch (UnsupportedEncodingException exception) {
            throw new RuntimeException(exception);
        }
    }

    public static <T> ObjectSerializer instantiateSerializer(Configuration conf, Class<T> type) throws ClassNotFoundException {
        ObjectSerializer objectSerializer;
        String serializerClassName = conf.get("cascading.util.serializer");
        Class<?> flowSerializerClass = serializerClassName == null || serializerClassName.length() == 0 ? DEFAULT_OBJECT_SERIALIZER : Class.forName(serializerClassName);
        try {
            objectSerializer = (ObjectSerializer)flowSerializerClass.newInstance();
            if (objectSerializer instanceof Configurable) {
                ((Configurable)objectSerializer).setConf(conf);
            }
        }
        catch (Exception exception) {
            exception.printStackTrace();
            throw new IllegalArgumentException("Unable to instantiate serializer \"" + flowSerializerClass.getName() + "\" for class: " + type.getName());
        }
        if (!objectSerializer.accepts(type)) {
            throw new IllegalArgumentException(serializerClassName + " won't accept objects of class " + type.toString());
        }
        return objectSerializer;
    }

    public static <T> String serializeBase64(T object, JobConf conf) throws IOException {
        return HadoopUtil.serializeBase64(object, conf, true);
    }

    public static <T> String serializeBase64(T object, JobConf conf, boolean compress) throws IOException {
        ObjectSerializer objectSerializer;
        try {
            objectSerializer = HadoopUtil.instantiateSerializer((Configuration)conf, object.getClass());
        }
        catch (ClassNotFoundException exception) {
            throw new IOException(exception);
        }
        return HadoopUtil.encodeBytes(objectSerializer.serialize(object, compress));
    }

    public static <T> T deserializeBase64(String string, Configuration conf, Class<T> type) throws IOException {
        return HadoopUtil.deserializeBase64(string, conf, type, true);
    }

    public static <T> T deserializeBase64(String string, Configuration conf, Class<T> type, boolean decompress) throws IOException {
        ObjectSerializer objectSerializer;
        if (string == null || string.length() == 0) {
            return null;
        }
        try {
            objectSerializer = HadoopUtil.instantiateSerializer(conf, type);
        }
        catch (ClassNotFoundException exception) {
            throw new IOException(exception);
        }
        return objectSerializer.deserialize(HadoopUtil.decodeBytes(string), type, decompress);
    }

    public static Class findMainClass(Class defaultType) {
        StackTraceElement[] stackTrace;
        for (StackTraceElement stackTraceElement : stackTrace = Thread.currentThread().getStackTrace()) {
            if (!stackTraceElement.getMethodName().equals("main") || stackTraceElement.getClassName().startsWith("org.apache.hadoop")) continue;
            try {
                LOG.info("resolving application jar from found main method on: {}", (Object)stackTraceElement.getClassName());
                return Thread.currentThread().getContextClassLoader().loadClass(stackTraceElement.getClassName());
            }
            catch (ClassNotFoundException exception) {
                LOG.warn("unable to load class while discovering application jar: {}", (Object)stackTraceElement.getClassName(), (Object)exception);
            }
        }
        LOG.info("using default application jar, may cause class not found exceptions on the cluster");
        return defaultType;
    }

    public static Map<String, String> getConfig(JobConf defaultConf, JobConf updatedConf) {
        HashMap<String, String> configs = new HashMap<String, String>();
        for (Map.Entry entry : updatedConf) {
            configs.put((String)entry.getKey(), (String)entry.getValue());
        }
        for (Map.Entry entry : defaultConf) {
            if (entry.getValue() == null) continue;
            String updatedValue = (String)configs.get(entry.getKey());
            if (updatedValue == null && entry.getValue() == null) {
                configs.remove(entry.getKey());
            }
            if (updatedValue != null && updatedValue.equals(entry.getValue())) {
                configs.remove(entry.getKey());
            }
            configs.remove("mapred.working.dir");
        }
        return configs;
    }

    public static JobConf[] getJobConfs(JobConf job, List<Map<String, String>> configs) {
        JobConf[] jobConfs = new JobConf[configs.size()];
        for (int i = 0; i < jobConfs.length; ++i) {
            jobConfs[i] = HadoopUtil.mergeConf(job, configs.get(i), false);
        }
        return jobConfs;
    }

    public static JobConf mergeConf(JobConf job, Map<String, String> config, boolean directly) {
        JobConf currentConf = directly ? job : new JobConf((Configuration)job);
        for (String key : config.keySet()) {
            LOG.debug("merging key: {} value: {}", (Object)key, (Object)config.get(key));
            currentConf.set(key, config.get(key));
        }
        return currentConf;
    }

    public static JobConf removePropertiesFrom(JobConf jobConf, String ... keys) {
        Map<Object, Object> properties = HadoopUtil.createProperties((Configuration)jobConf);
        for (String key : keys) {
            properties.remove(key);
        }
        return HadoopUtil.createJobConf(properties, null);
    }

    public static boolean removeStateFromDistCache(JobConf conf, String path) throws IOException {
        return new Hfs(new TextLine(), path).deleteResource(conf);
    }

    public static String writeStateToDistCache(JobConf conf, String id, String stepState) {
        LOG.info("writing step state to dist cache, too large for job conf, size: {}", (Object)stepState.length());
        String statePath = Hfs.getTempPath(conf) + "/step-state-" + id;
        Hfs temp = new Hfs(new TextLine(), statePath, SinkMode.REPLACE);
        try {
            TupleEntryCollector writer = temp.openForWrite(new HadoopFlowProcess(conf));
            writer.add(new Tuple(new Object[]{stepState}));
            writer.close();
        }
        catch (IOException exception) {
            throw new FlowException("unable to write step state to Hadoop FS: " + temp.getIdentifier());
        }
        URI uri = new Path(statePath).toUri();
        DistributedCache.addCacheFile((URI)uri, (Configuration)conf);
        LOG.info("using step state path: {}", (Object)uri);
        return statePath;
    }

    public static String readStateFromDistCache(JobConf jobConf, String id) throws IOException {
        Path[] files = DistributedCache.getLocalCacheFiles((Configuration)jobConf);
        Path stepStatePath = null;
        for (Path file : files) {
            if (!file.toString().contains("step-state-" + id)) continue;
            stepStatePath = file;
            break;
        }
        if (stepStatePath == null) {
            throw new FlowException("unable to find step state from distributed cache");
        }
        LOG.info("reading step state from local path: {}", stepStatePath);
        Lfs temp = new Lfs(new TextLine(new Fields(new Comparable[]{"line"})), stepStatePath.toString());
        TupleEntryIterator reader = null;
        try {
            reader = temp.openForRead(new HadoopFlowProcess(jobConf));
            if (!reader.hasNext()) {
                throw new FlowException("step state path is empty: " + temp.getIdentifier());
            }
            String i$ = ((TupleEntry)reader.next()).getString((Comparable)Integer.valueOf(0));
            return i$;
        }
        catch (IOException exception) {
            throw new FlowException("unable to find state path: " + temp.getIdentifier(), (Throwable)exception);
        }
        finally {
            reader.close();
        }
    }

    public static PlatformInfo getPlatformInfo() {
        if (platformInfo == null) {
            platformInfo = HadoopUtil.getPlatformInfoInternal();
        }
        return platformInfo;
    }

    private static PlatformInfo getPlatformInfoInternal() {
        Manifest manifest;
        URL url = JobConf.class.getResource(JobConf.class.getSimpleName() + ".class");
        if (url == null || !url.toString().startsWith("jar")) {
            return new PlatformInfo("Hadoop", null, null);
        }
        String path = url.toString();
        String manifestPath = path.substring(0, path.lastIndexOf("!") + 1) + "/META-INF/MANIFEST.MF";
        try {
            manifest = new Manifest(new URL(manifestPath).openStream());
        }
        catch (IOException exception) {
            LOG.warn("unable to get manifest from {}", (Object)manifestPath, (Object)exception);
            return new PlatformInfo("Hadoop", null, null);
        }
        Attributes attributes = manifest.getAttributes("org/apache/hadoop");
        if (attributes == null) {
            LOG.debug("unable to get Hadoop manifest attributes");
            new PlatformInfo("Hadoop", null, null);
        }
        String vendor = attributes.getValue("Implementation-Vendor");
        String version = attributes.getValue("Implementation-Version");
        return new PlatformInfo("Hadoop", vendor, version);
    }
}

