/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.job_monitor;

import com.codahale.metrics.Counter;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import com.typesafe.config.ConfigValue;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.reporter.util.NoopSchemaVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
import org.apache.gobblin.runtime.api.GobblinInstanceDriver;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecMonitor;
import org.apache.gobblin.runtime.api.JobSpecMonitorFactory;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.job_monitor.KafkaAvroJobMonitor;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.fs.Path;

public class SLAEventKafkaJobMonitor
extends KafkaAvroJobMonitor<GobblinTrackingEvent> {
    public static final String CONFIG_PREFIX = "gobblin.jobMonitor.slaEvent";
    public static final String DATASET_URN_FILTER_KEY = "filter.urn";
    public static final String EVENT_NAME_FILTER_KEY = "filter.name";
    public static final String TEMPLATE_KEY = "job_template";
    public static final String EXTRACT_KEYS = "extract_keys";
    public static final String BASE_URI_KEY = "baseUri";
    public static final String TOPIC_KEY = "topic";
    public static final String SCHEMA_VERSION_READER_CLASS = "versionReaderClass";
    private static final Config DEFAULTS = ConfigFactory.parseMap((Map)ImmutableMap.of((Object)"baseUri", (Object)SLAEventKafkaJobMonitor.class.getSimpleName(), (Object)"versionReaderClass", (Object)NoopSchemaVersionWriter.class.getName()));
    private final Optional<Pattern> urnFilter;
    private final Optional<Pattern> nameFilter;
    private final URI baseURI;
    private final URI template;
    private final Map<String, String> extractKeys;
    private Counter rejectedEvents;

    protected SLAEventKafkaJobMonitor(String topic, MutableJobCatalog catalog, URI baseURI, Config limitedScopeConfig, SchemaVersionWriter<?> versionWriter, Optional<Pattern> urnFilter, Optional<Pattern> nameFilter, URI template, Map<String, String> extractKeys) throws IOException {
        super(topic, catalog, limitedScopeConfig, GobblinTrackingEvent.SCHEMA$, versionWriter);
        this.baseURI = baseURI;
        this.urnFilter = urnFilter;
        this.nameFilter = nameFilter;
        this.template = template;
        this.extractKeys = extractKeys;
    }

    @Override
    protected void createMetrics() {
        super.createMetrics();
        this.rejectedEvents = this.getMetricContext().counter("gobblin.jobMonitor.slaevent.rejectedevents");
    }

    @Override
    public Collection<JobSpec> parseJobSpec(GobblinTrackingEvent event) {
        if (!this.acceptEvent(event)) {
            this.rejectedEvents.inc();
            return Lists.newArrayList();
        }
        String datasetURN = (String)event.getMetadata().get("event.sla.datasetUrn");
        URI jobSpecURI = PathUtils.mergePaths((Path)new Path(this.baseURI), (Path)new Path(datasetURN)).toUri();
        HashMap jobConfigMap = Maps.newHashMap();
        for (Map.Entry<String, String> entry : this.extractKeys.entrySet()) {
            if (!event.getMetadata().containsKey(entry.getKey())) continue;
            jobConfigMap.put(entry.getValue(), event.getMetadata().get(entry.getKey()));
        }
        Config jobConfig = ConfigFactory.parseMap((Map)jobConfigMap);
        JobSpec jobSpec = JobSpec.builder(jobSpecURI).withTemplate(this.template).withConfig(jobConfig).build();
        return Lists.newArrayList((Object[])new JobSpec[]{jobSpec});
    }

    protected boolean acceptEvent(GobblinTrackingEvent event) {
        if (!event.getMetadata().containsKey("event.sla.datasetUrn")) {
            return false;
        }
        String datasetURN = (String)event.getMetadata().get("event.sla.datasetUrn");
        if (this.urnFilter.isPresent() && !((Pattern)this.urnFilter.get()).matcher(datasetURN).find()) {
            return false;
        }
        return !this.nameFilter.isPresent() || ((Pattern)this.nameFilter.get()).matcher(event.getName()).find();
    }

    public Optional<Pattern> getUrnFilter() {
        return this.urnFilter;
    }

    public Optional<Pattern> getNameFilter() {
        return this.nameFilter;
    }

    public URI getBaseURI() {
        return this.baseURI;
    }

    public URI getTemplate() {
        return this.template;
    }

    public Map<String, String> getExtractKeys() {
        return this.extractKeys;
    }

    public Counter getRejectedEvents() {
        return this.rejectedEvents;
    }

    public static class Factory
    implements JobSpecMonitorFactory {
        @Override
        public JobSpecMonitor forJobCatalog(GobblinInstanceDriver instanceDriver, MutableJobCatalog jobCatalog) throws IOException {
            Config config = instanceDriver.getSysConfig().getConfig().getConfig(SLAEventKafkaJobMonitor.CONFIG_PREFIX).withFallback((ConfigMergeable)DEFAULTS);
            return this.forConfig(config, jobCatalog);
        }

        public JobSpecMonitor forConfig(Config localScopeConfig, MutableJobCatalog jobCatalog) throws IOException {
            SchemaVersionWriter versionWriter;
            ImmutableMap extractKeys;
            URI template;
            URI baseUri;
            Preconditions.checkArgument((boolean)localScopeConfig.hasPath(SLAEventKafkaJobMonitor.TEMPLATE_KEY));
            Preconditions.checkArgument((boolean)localScopeConfig.hasPath(SLAEventKafkaJobMonitor.TOPIC_KEY));
            String topic = localScopeConfig.getString(SLAEventKafkaJobMonitor.TOPIC_KEY);
            try {
                baseUri = new URI(localScopeConfig.getString(SLAEventKafkaJobMonitor.BASE_URI_KEY));
            }
            catch (URISyntaxException use) {
                throw new IOException("Invalid base URI " + localScopeConfig.getString(SLAEventKafkaJobMonitor.BASE_URI_KEY), use);
            }
            String templateURIString = localScopeConfig.getString(SLAEventKafkaJobMonitor.TEMPLATE_KEY);
            try {
                template = new URI(templateURIString);
            }
            catch (URISyntaxException uri) {
                throw new IOException("Invalid template URI " + templateURIString);
            }
            ImmutableMap.Builder mapBuilder = ImmutableMap.builder();
            if (localScopeConfig.hasPath(SLAEventKafkaJobMonitor.EXTRACT_KEYS)) {
                extractKeys = localScopeConfig.getConfig(SLAEventKafkaJobMonitor.EXTRACT_KEYS);
                for (Map.Entry entry : extractKeys.entrySet()) {
                    Object unwrappedValue = ((ConfigValue)entry.getValue()).unwrapped();
                    if (!(unwrappedValue instanceof String)) continue;
                    mapBuilder.put(entry.getKey(), (Object)((String)unwrappedValue));
                }
            }
            extractKeys = mapBuilder.build();
            Optional urnFilter = localScopeConfig.hasPath(SLAEventKafkaJobMonitor.DATASET_URN_FILTER_KEY) ? Optional.of((Object)Pattern.compile(localScopeConfig.getString(SLAEventKafkaJobMonitor.DATASET_URN_FILTER_KEY))) : Optional.absent();
            Optional nameFilter = localScopeConfig.hasPath(SLAEventKafkaJobMonitor.EVENT_NAME_FILTER_KEY) ? Optional.of((Object)Pattern.compile(localScopeConfig.getString(SLAEventKafkaJobMonitor.EVENT_NAME_FILTER_KEY))) : Optional.absent();
            try {
                versionWriter = (SchemaVersionWriter)GobblinConstructorUtils.invokeLongestConstructor(Class.forName(localScopeConfig.getString(SLAEventKafkaJobMonitor.SCHEMA_VERSION_READER_CLASS)), (Object[])new Object[]{localScopeConfig});
            }
            catch (ReflectiveOperationException roe) {
                throw new IllegalArgumentException(roe);
            }
            return new SLAEventKafkaJobMonitor(topic, jobCatalog, baseUri, localScopeConfig, versionWriter, (Optional<Pattern>)urnFilter, (Optional<Pattern>)nameFilter, template, (Map<String, String>)extractKeys);
        }
    }
}

