/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.hadoop.cascading;

import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.elasticsearch.hadoop.cascading.CascadingValueWriter;
import org.elasticsearch.hadoop.cascading.ESTap;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
import org.elasticsearch.hadoop.mr.ESInputFormat;
import org.elasticsearch.hadoop.mr.ESOutputFormat;
import org.elasticsearch.hadoop.serialization.JdkValueReader;
import org.elasticsearch.hadoop.serialization.SerializationUtils;

class ESHadoopScheme
extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
    private final String index;
    private final String host;
    private final int port;

    ESHadoopScheme(String host, int port, String index, Fields fields) {
        this.index = index;
        this.host = host;
        this.port = port;
        if (fields != null) {
            this.setSinkFields(fields);
            this.setSourceFields(fields);
        }
    }

    public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
        super.sourcePrepare(flowProcess, sourceCall);
        Fields sourceCallFields = sourceCall.getIncomingEntry().getFields();
        Fields sourceFields = sourceCallFields.isDefined() ? sourceCallFields : this.getSourceFields();
        List<String> tupleNames = this.resolveNames(sourceFields);
        Object[] context = new Object[]{tupleNames, ((RecordReader)sourceCall.getInput()).createKey(), ((RecordReader)sourceCall.getInput()).createValue()};
        sourceCall.setContext((Object)context);
    }

    public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
        sourceCall.setContext(null);
    }

    public void sinkPrepare(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
        super.sinkPrepare(flowProcess, sinkCall);
        Fields sinkCallFields = sinkCall.getOutgoingEntry().getFields();
        Fields sinkFields = sinkCallFields.isDefined() ? sinkCallFields : this.getSinkFields();
        List<String> tupleNames = this.resolveNames(sinkFields);
        Object[] context = new Object[]{tupleNames};
        sinkCall.setContext((Object)context);
    }

    public void sinkCleanup(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
        sinkCall.setContext(null);
    }

    private List<String> resolveNames(Fields fields) {
        if (fields == null || !fields.isDefined()) {
            return Collections.emptyList();
        }
        int size = fields.size();
        ArrayList<String> names = new ArrayList<String>(size);
        for (int fieldIndex = 0; fieldIndex < size; ++fieldIndex) {
            names.add(fields.get(fieldIndex).toString());
        }
        return names;
    }

    public void sourceConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
        this.initTargetUri(conf);
        conf.setInputFormat(ESInputFormat.class);
    }

    public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
        this.initTargetUri(conf);
        conf.setOutputFormat(ESOutputFormat.class);
        Settings set = SettingsManager.loadFrom(conf);
        SerializationUtils.setValueWriterIfNotSet(set, CascadingValueWriter.class, LogFactory.getLog(ESTap.class));
        SerializationUtils.setValueReaderIfNotSet(set, JdkValueReader.class, LogFactory.getLog(ESTap.class));
        conf.set("mapred.output.dir", set.getTargetResource());
        conf.set("mapred.output.committer.class", ESOutputFormat.ESOldAPIOutputCommitter.class.getName());
    }

    private void initTargetUri(JobConf conf) {
        SettingsManager.loadFrom(conf).setHost(this.host).setPort(this.port).setResource(this.index).save();
    }

    public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
        Object[] context = (Object[])sourceCall.getContext();
        if (!((RecordReader)sourceCall.getInput()).next(context[1], context[2])) {
            return false;
        }
        Tuple tuple = sourceCall.getIncomingEntry().getTuple();
        tuple.clear();
        tuple.addAll(new Object[]{context[1], context[2]});
        return true;
    }

    public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
        ((OutputCollector)sinkCall.getOutput()).collect(null, sinkCall);
    }
}

