/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.hadoop.mr;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
import org.elasticsearch.hadoop.mr.HadoopCfgUtils;
import org.elasticsearch.hadoop.mr.HeartBeat;
import org.elasticsearch.hadoop.mr.ReportingUtils;
import org.elasticsearch.hadoop.mr.WritableBytesConverter;
import org.elasticsearch.hadoop.mr.WritableValueWriter;
import org.elasticsearch.hadoop.mr.compat.CompatHandler;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.serialization.dto.Node;
import org.elasticsearch.hadoop.serialization.dto.Shard;
import org.elasticsearch.hadoop.serialization.field.IndexExtractor;
import org.elasticsearch.hadoop.serialization.field.MapWritableFieldExtractor;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.SettingsUtils;
import org.elasticsearch.hadoop.util.Version;

public class EsOutputFormat
extends org.apache.hadoop.mapreduce.OutputFormat
implements OutputFormat {
    private static Log log = LogFactory.getLog(EsOutputFormat.class);
    private static final int NO_TASK_ID = -1;

    public RecordWriter getRecordWriter(TaskAttemptContext context) {
        return (RecordWriter)this.getRecordWriter(null, HadoopCfgUtils.asJobConf(CompatHandler.taskAttemptContext(context).getConfiguration()), null, (Progressable)context);
    }

    public void checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext context) throws IOException {
        this.init(CompatHandler.jobContext(context).getConfiguration());
    }

    public org.apache.hadoop.mapreduce.OutputCommitter getOutputCommitter(TaskAttemptContext context) {
        return new EsOutputCommitter();
    }

    public org.apache.hadoop.mapred.RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) {
        return new EsRecordWriter((Configuration)job, progress);
    }

    public void checkOutputSpecs(FileSystem ignored, JobConf cfg) throws IOException {
        this.init((Configuration)cfg);
    }

    private void init(Configuration cfg) throws IOException {
        Settings settings = SettingsManager.loadFrom(cfg);
        Assert.hasText(settings.getResourceWrite(), String.format("No resource ['%s'] (index/query/location) specified", "es.resource"));
        RestRepository client = null;
        InitializationUtils.checkIdForOperation(settings);
        InitializationUtils.checkIndexExistence(settings, client);
        if (HadoopCfgUtils.getReduceTasks(cfg) != null) {
            if (HadoopCfgUtils.getSpeculativeReduce(cfg)) {
                log.warn((Object)"Speculative execution enabled for reducer - consider disabling it to prevent data corruption");
            }
        } else if (HadoopCfgUtils.getSpeculativeMap(cfg)) {
            log.warn((Object)"Speculative execution enabled for mapper - consider disabling it to prevent data corruption");
        }
        Version.logVersion();
        log.info((Object)String.format("Writing to [%s]", settings.getResourceWrite()));
    }

    protected static class EsRecordWriter
    extends RecordWriter
    implements org.apache.hadoop.mapred.RecordWriter {
        protected final Configuration cfg;
        protected boolean initialized = false;
        protected RestRepository repository;
        private String uri;
        private Resource resource;
        private HeartBeat beat;
        private Progressable progressable;

        public EsRecordWriter(Configuration cfg, Progressable progressable) {
            this.cfg = cfg;
            this.progressable = progressable;
        }

        public void write(Object key, Object value) throws IOException {
            if (!this.initialized) {
                this.initialized = true;
                this.init();
            }
            this.repository.writeToIndex(value);
        }

        protected void init() throws IOException {
            int currentInstance = this.detectCurrentInstance(this.cfg);
            if (log.isTraceEnabled()) {
                log.trace((Object)String.format("EsRecordWriter instance [%s] initiating discovery of target shard...", currentInstance));
            }
            Settings settings = SettingsManager.loadFrom(this.cfg).copy();
            if (log.isTraceEnabled()) {
                log.trace((Object)String.format("Init shard writer from cfg %s", HadoopCfgUtils.asProperties(this.cfg)));
            }
            InitializationUtils.setValueWriterIfNotSet(settings, WritableValueWriter.class, log);
            InitializationUtils.setBytesConverterIfNeeded(settings, WritableBytesConverter.class, log);
            InitializationUtils.setFieldExtractorIfNotSet(settings, MapWritableFieldExtractor.class, log);
            InitializationUtils.discoverNodesIfNeeded(settings, log);
            InitializationUtils.discoverEsVersion(settings, log);
            List<String> nodes = SettingsUtils.discoveredOrDeclaredNodes(settings);
            SettingsUtils.pinNode(settings, nodes.get(currentInstance % nodes.size()));
            if (this.progressable != null) {
                this.beat = new HeartBeat(this.progressable, this.cfg, settings.getHeartBeatLead(), log);
                this.beat.start();
            }
            this.resource = new Resource(settings, false);
            IndexExtractor iformat = (IndexExtractor)ObjectUtils.instantiate(settings.getMappingIndexExtractorClassName(), settings);
            iformat.compile(this.resource.toString());
            if (iformat.hasPattern()) {
                this.initMultiIndices(settings, currentInstance);
            } else {
                this.initSingleIndex(settings, currentInstance);
            }
        }

        private void initSingleIndex(Settings settings, int currentInstance) throws IOException {
            if (log.isDebugEnabled()) {
                log.debug((Object)String.format("Resource [%s] resolves as a single index", this.resource));
            }
            this.repository = new RestRepository(settings);
            if (this.repository.touch() && this.repository.waitForYellow()) {
                log.warn((Object)String.format("Timed out waiting for index [%s] to reach yellow health", this.resource));
            }
            Map<Shard, Node> targetShards = this.repository.getWriteTargetPrimaryShards();
            this.repository.close();
            Assert.isTrue(!targetShards.isEmpty(), String.format("Cannot determine write shards for [%s]; likely its format is incorrect (maybe it contains illegal characters?)", this.resource));
            ArrayList<Shard> orderedShards = new ArrayList<Shard>(targetShards.keySet());
            Collections.sort(orderedShards);
            if (log.isTraceEnabled()) {
                log.trace((Object)String.format("ESRecordWriter instance [%s] discovered %s primary shards %s", currentInstance, orderedShards.size(), orderedShards));
            }
            if (currentInstance <= 0) {
                currentInstance = new Random().nextInt(targetShards.size()) + 1;
            }
            int bucket = currentInstance % targetShards.size();
            Shard chosenShard = (Shard)orderedShards.get(bucket);
            Node targetNode = targetShards.get(chosenShard);
            SettingsUtils.pinNode(settings, targetNode.getIpAddress(), targetNode.getHttpPort());
            this.repository = new RestRepository(settings);
            if (log.isDebugEnabled()) {
                log.debug((Object)String.format("EsRecordWriter instance [%s] assigned to primary shard [%s] at address [%s]", currentInstance, chosenShard.getName(), this.uri));
            }
        }

        private void initMultiIndices(Settings settings, int currentInstance) throws IOException {
            if (log.isDebugEnabled()) {
                log.debug((Object)String.format("Resource [%s] resolves as an index pattern", this.resource));
            }
            List<String> nodes = SettingsUtils.discoveredOrDeclaredNodes(settings);
            String node = nodes.get(new Random().nextInt(nodes.size()));
            SettingsUtils.pinNode(settings, node);
            if (log.isDebugEnabled()) {
                log.debug((Object)String.format("EsRecordWriter instance [%s] assigned to [%s]", currentInstance, this.uri));
            }
            this.repository = new RestRepository(settings);
        }

        private int detectCurrentInstance(Configuration conf) {
            TaskID taskID = HadoopCfgUtils.getTaskID(conf);
            if (taskID == null) {
                log.warn((Object)String.format("Cannot determine task id - redirecting writes in a random fashion", new Object[0]));
                return -1;
            }
            return taskID.getId();
        }

        public void close(TaskAttemptContext context) throws IOException {
            this.doClose((Progressable)context);
        }

        public void close(Reporter reporter) throws IOException {
            this.doClose((Progressable)reporter);
        }

        protected void doClose(Progressable progressable) {
            if (log.isTraceEnabled()) {
                log.trace((Object)String.format("Closing RecordWriter [%s][%s]", this.uri, this.resource));
            }
            if (this.beat != null) {
                this.beat.stop();
            }
            if (this.repository != null) {
                this.repository.close();
                ReportingUtils.report(progressable, this.repository.stats());
            }
            this.initialized = false;
        }
    }

    public static class EsOldAPIOutputCommitter
    extends OutputCommitter {
        public void setupJob(JobContext jobContext) throws IOException {
        }

        public void setupTask(org.apache.hadoop.mapred.TaskAttemptContext taskContext) throws IOException {
        }

        public boolean needsTaskCommit(org.apache.hadoop.mapred.TaskAttemptContext taskContext) throws IOException {
            return false;
        }

        public void commitTask(org.apache.hadoop.mapred.TaskAttemptContext taskContext) throws IOException {
        }

        public void abortTask(org.apache.hadoop.mapred.TaskAttemptContext taskContext) throws IOException {
        }

        @Deprecated
        public void cleanupJob(JobContext context) throws IOException {
        }
    }

    public static class EsOutputCommitter
    extends org.apache.hadoop.mapreduce.OutputCommitter {
        public void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext) throws IOException {
        }

        @Deprecated
        public void cleanupJob(org.apache.hadoop.mapreduce.JobContext jobContext) throws IOException {
        }

        public void setupTask(TaskAttemptContext taskContext) throws IOException {
        }

        public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
            return false;
        }

        public void commitTask(TaskAttemptContext taskContext) throws IOException {
        }

        public void abortTask(TaskAttemptContext taskContext) throws IOException {
        }
    }
}

