/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.hadoop.cascading;

import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.elasticsearch.hadoop.cascading.CascadingFieldExtractor;
import org.elasticsearch.hadoop.cascading.CascadingLocalBytesConverter;
import org.elasticsearch.hadoop.cascading.CascadingUtils;
import org.elasticsearch.hadoop.cascading.CascadingValueWriter;
import org.elasticsearch.hadoop.cascading.EsTap;
import org.elasticsearch.hadoop.cfg.HadoopSettingsManager;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.elasticsearch.hadoop.mr.HadoopCfgUtils;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.util.FieldAlias;
import org.elasticsearch.hadoop.util.StringUtils;

class EsHadoopScheme
extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
    private static final long serialVersionUID = 4304172465362298925L;
    private static final int SRC_CTX_SIZE = 4;
    private static final int SRC_CTX_KEY = 0;
    private static final int SRC_CTX_VALUE = 1;
    private static final int SRC_CTX_ALIASES = 2;
    private static final int SRC_CTX_OUTPUT_JSON = 3;
    private final String index;
    private final String query;
    private final String nodes;
    private final int port;
    private final Properties props;
    private static Log log = LogFactory.getLog(EsHadoopScheme.class);

    EsHadoopScheme(String nodes, int port, String index, String query, Fields fields, Properties props) {
        this.index = index;
        this.query = query;
        this.nodes = nodes;
        this.port = port;
        if (fields != null) {
            this.setSinkFields(fields);
            this.setSourceFields(fields);
        }
        this.props = props;
    }

    public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
        super.sourcePrepare(flowProcess, sourceCall);
        Object[] context = new Object[4];
        context[0] = ((RecordReader)sourceCall.getInput()).createKey();
        context[1] = ((RecordReader)sourceCall.getInput()).createValue();
        Settings settings = this.loadSettings(flowProcess.getConfigCopy(), true);
        context[2] = CascadingUtils.alias(settings);
        context[3] = settings.getOutputAsJson();
        sourceCall.setContext((Object)context);
    }

    public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
        super.sourceCleanup(flowProcess, sourceCall);
        sourceCall.setContext(null);
    }

    public void sinkPrepare(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
        super.sinkPrepare(flowProcess, sinkCall);
        Object[] context = new Object[1];
        Settings settings = this.loadSettings(flowProcess.getConfigCopy(), false);
        context[0] = CascadingUtils.fieldToAlias(settings, this.getSinkFields());
        sinkCall.setContext((Object)context);
    }

    public void sinkCleanup(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
        super.sinkCleanup(flowProcess, sinkCall);
        sinkCall.setContext(null);
    }

    public void sourceConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
        conf.setInputFormat(EsInputFormat.class);
        Settings set = this.loadSettings(conf, true);
        Collection<String> fields = CascadingUtils.fieldToAlias(set, this.getSourceFields());
        conf.set("es.internal.mr.target.fields", StringUtils.concatenate(fields));
        if (log.isTraceEnabled()) {
            log.trace((Object)("Initialized (source) configuration " + HadoopCfgUtils.asProperties((Configuration)conf)));
        }
    }

    public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
        conf.setOutputFormat(EsOutputFormat.class);
        Settings set = this.loadSettings(conf, false);
        Log log = LogFactory.getLog(EsTap.class);
        InitializationUtils.setValueWriterIfNotSet(set, CascadingValueWriter.class, log);
        InitializationUtils.setValueReaderIfNotSet(set, JdkValueReader.class, log);
        InitializationUtils.setBytesConverterIfNeeded(set, CascadingLocalBytesConverter.class, log);
        InitializationUtils.setFieldExtractorIfNotSet(set, CascadingFieldExtractor.class, log);
        HadoopCfgUtils.setFileOutputFormatDir((Configuration)conf, set.getResourceWrite());
        HadoopCfgUtils.setOutputCommitterClass((Configuration)conf, EsOutputFormat.EsOldAPIOutputCommitter.class.getName());
        if (log.isTraceEnabled()) {
            log.trace((Object)("Initialized (sink) configuration " + HadoopCfgUtils.asProperties((Configuration)conf)));
        }
    }

    private Settings loadSettings(Object source, boolean read) {
        return CascadingUtils.init(HadoopSettingsManager.loadFrom(source).merge(this.props), this.nodes, this.port, this.index, this.query, read);
    }

    public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
        Map<Text, Object> data;
        Object[] context = (Object[])sourceCall.getContext();
        if (!((RecordReader)sourceCall.getInput()).next(context[0], context[1])) {
            return false;
        }
        boolean isJSON = (Boolean)context[3];
        TupleEntry entry = sourceCall.getIncomingEntry();
        if (isJSON) {
            data = new HashMap(1);
            data.put(new Text("data"), context[1]);
        } else {
            data = (Map)context[1];
        }
        FieldAlias alias = (FieldAlias)context[2];
        if (entry.getFields().isDefined()) {
            Text lookupKey = new Text();
            for (Comparable field : entry.getFields()) {
                Map<Object, Object> result = data;
                for (String level : StringUtils.tokenize(alias.toES(field.toString()), ".")) {
                    lookupKey.set(level);
                    if ((result = result.get(lookupKey)) != null) continue;
                    break;
                }
                CascadingUtils.setObject(entry, field, result);
            }
        } else {
            List elements = Tuple.elements((Tuple)entry.getTuple());
            elements.clear();
            elements.addAll(data.values());
        }
        return true;
    }

    public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
        ((OutputCollector)sinkCall.getOutput()).collect(null, sinkCall);
    }
}

