/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.job_monitor;

import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.typesafe.config.Config;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import kafka.message.MessageAndMetadata;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecMonitor;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Either;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class KafkaJobMonitor
extends HighLevelConsumer<byte[], byte[]>
implements JobSpecMonitor {
    private static final Logger log = LoggerFactory.getLogger(KafkaJobMonitor.class);
    public static final String KAFKA_JOB_MONITOR_PREFIX = "jobSpecMonitor.kafka";
    public static final String KAFKA_AUTO_OFFSET_RESET_KEY = "jobSpecMonitor.kafka.auto.offset.reset";
    public static final String KAFKA_AUTO_OFFSET_RESET_SMALLEST = "smallest";
    public static final String KAFKA_AUTO_OFFSET_RESET_LARGEST = "largest";
    protected DatasetStateStore datasetStateStore;
    protected final MutableJobCatalog jobCatalog;
    protected Counter newSpecs;
    protected Counter removedSpecs;

    public abstract Collection<Either<JobSpec, URI>> parseJobSpec(byte[] var1) throws IOException;

    public KafkaJobMonitor(String topic, MutableJobCatalog catalog, Config config) {
        super(topic, ConfigUtils.getConfigOrEmpty((Config)config, (String)KAFKA_JOB_MONITOR_PREFIX), 1);
        this.jobCatalog = catalog;
        try {
            this.datasetStateStore = DatasetStateStore.buildDatasetStateStore((Config)config);
        }
        catch (Exception e) {
            log.warn("DatasetStateStore could not be created.", (Throwable)e);
        }
    }

    @Override
    protected void createMetrics() {
        super.createMetrics();
        this.newSpecs = this.getMetricContext().counter("gobblin.jobMonitor.kafka.newSpecs");
        this.removedSpecs = this.getMetricContext().counter("gobblin.jobMonitor.kafka.removedSpecs");
    }

    @Override
    @VisibleForTesting
    protected void buildMetricsContextAndMetrics() {
        super.buildMetricsContextAndMetrics();
    }

    @Override
    @VisibleForTesting
    protected void shutdownMetrics() throws IOException {
        super.shutdownMetrics();
    }

    @Override
    protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
        try {
            Collection<Either<JobSpec, URI>> parsedCollection = this.parseJobSpec((byte[])message.message());
            for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
                if (parsedMessage instanceof Either.Left) {
                    this.newSpecs.inc();
                    this.jobCatalog.put((JobSpec)((Either.Left)parsedMessage).getLeft());
                    continue;
                }
                if (!(parsedMessage instanceof Either.Right)) continue;
                this.removedSpecs.inc();
                this.jobCatalog.remove((URI)((Either.Right)parsedMessage).getRight());
            }
        }
        catch (IOException ioe) {
            String messageStr = new String((byte[])message.message(), Charsets.UTF_8);
            log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), (Throwable)ioe);
        }
    }

    public Counter getNewSpecs() {
        return this.newSpecs;
    }

    public Counter getRemovedSpecs() {
        return this.removedSpecs;
    }
}

