/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.data.management.conversion.hive.source;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.gson.Gson;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.conversion.hive.avro.AvroSchemaManager;
import org.apache.gobblin.data.management.conversion.hive.avro.SchemaNotFoundException;
import org.apache.gobblin.data.management.conversion.hive.events.EventWorkunitUtils;
import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractorFactory;
import org.apache.gobblin.data.management.conversion.hive.extractor.HiveConvertExtractorFactory;
import org.apache.gobblin.data.management.conversion.hive.provider.HiveUnitUpdateProvider;
import org.apache.gobblin.data.management.conversion.hive.provider.UpdateNotFoundException;
import org.apache.gobblin.data.management.conversion.hive.provider.UpdateProviderFactory;
import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
import org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarker;
import org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarkerFactory;
import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker;
import org.apache.gobblin.data.management.copy.hive.HiveDataset;
import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
import org.apache.gobblin.data.management.copy.hive.HiveUtils;
import org.apache.gobblin.data.management.copy.hive.filter.LookbackPartitionFilterGenerator;
import org.apache.gobblin.dataset.IterableDatasetFinder;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.Watermark;
import org.apache.gobblin.source.extractor.WatermarkInterval;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.AutoReturnableObject;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.io.GsonInterfaceAdapter;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
import org.apache.log4j.Level;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
public class HiveSource
implements Source {
    private static final Logger log = LoggerFactory.getLogger(HiveSource.class);
    public static final String HIVE_SOURCE_MAXIMUM_LOOKBACK_DAYS_KEY = "hive.source.maximum.lookbackDays";
    public static final int DEFAULT_HIVE_SOURCE_MAXIMUM_LOOKBACK_DAYS = 3;
    public static final String HIVE_SOURCE_DATASET_FINDER_CLASS_KEY = "hive.dataset.finder.class";
    public static final String DEFAULT_HIVE_SOURCE_DATASET_FINDER_CLASS = HiveDatasetFinder.class.getName();
    public static final String DISTCP_REGISTRATION_GENERATION_TIME_KEY = "registrationGenerationTimeMillis";
    public static final String HIVE_SOURCE_WATERMARKER_FACTORY_CLASS_KEY = "hive.source.watermarker.factoryClass";
    public static final String DEFAULT_HIVE_SOURCE_WATERMARKER_FACTORY_CLASS = PartitionLevelWatermarker.Factory.class.getName();
    public static final String HIVE_SOURCE_EXTRACTOR_TYPE = "hive.source.extractorType";
    public static final String DEFAULT_HIVE_SOURCE_EXTRACTOR_TYPE = HiveConvertExtractorFactory.class.getName();
    public static final String HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS = "hive.source.createWorkunitsForPartitions";
    public static final boolean DEFAULT_HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS = true;
    public static final String HIVE_SOURCE_FS_URI = "hive.source.fs.uri";
    public static final String HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER_KEY = "hive.source.ignoreDataPathIdentifier";
    public static final String DEFAULT_HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER = "";
    public static final Gson GENERICS_AWARE_GSON = GsonInterfaceAdapter.getGson(Object.class);
    public static final Splitter COMMA_BASED_SPLITTER = Splitter.on((String)",").omitEmptyStrings().trimResults();
    protected MetricContext metricContext;
    protected EventSubmitter eventSubmitter;
    protected AvroSchemaManager avroSchemaManager;
    protected HiveUnitUpdateProvider updateProvider;
    protected HiveSourceWatermarker watermarker;
    protected IterableDatasetFinder<HiveDataset> datasetFinder;
    protected List<WorkUnit> workunits;
    protected long maxLookBackTime;
    protected long beginGetWorkunitsTime;
    protected List<String> ignoreDataPathIdentifierList;
    protected final ClassAliasResolver<HiveBaseExtractorFactory> classAliasResolver = new ClassAliasResolver(HiveBaseExtractorFactory.class);

    public List<WorkUnit> getWorkunits(SourceState state) {
        try {
            this.beginGetWorkunitsTime = System.currentTimeMillis();
            this.initialize(state);
            EventSubmitter.submit((Optional)Optional.of((Object)this.eventSubmitter), (String)"gobblin.hive.conversion.FindHiveTables");
            Iterator iterator = this.datasetFinder.getDatasetsIterator();
            while (iterator.hasNext()) {
                HiveDataset hiveDataset = (HiveDataset)iterator.next();
                AutoReturnableObject client = hiveDataset.getClientPool().getClient();
                Throwable throwable = null;
                try {
                    log.debug(String.format("Processing dataset: %s", hiveDataset));
                    if (HiveUtils.isPartitioned(hiveDataset.getTable()) && state.getPropAsBoolean(HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS, true)) {
                        this.createWorkunitsForPartitionedTable(hiveDataset, (AutoReturnableObject<IMetaStoreClient>)client);
                        continue;
                    }
                    this.createWorkunitForNonPartitionedTable(hiveDataset);
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (client == null) continue;
                    if (throwable != null) {
                        try {
                            client.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    client.close();
                }
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        int realWorkunits = this.workunits.size();
        this.watermarker.onGetWorkunitsEnd(this.workunits);
        log.info(String.format("Created %s real workunits and %s watermark workunits", realWorkunits, this.workunits.size() - realWorkunits));
        return this.workunits;
    }

    @VisibleForTesting
    public void initialize(SourceState state) throws IOException {
        this.updateProvider = UpdateProviderFactory.create((State)state);
        this.metricContext = Instrumented.getMetricContext((State)state, HiveSource.class);
        this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, "gobblin.hive.conversion").build();
        this.avroSchemaManager = new AvroSchemaManager(HiveSource.getSourceFs((State)state), (State)state);
        this.workunits = Lists.newArrayList();
        this.watermarker = ((HiveSourceWatermarkerFactory)GobblinConstructorUtils.invokeConstructor(HiveSourceWatermarkerFactory.class, (String)state.getProp(HIVE_SOURCE_WATERMARKER_FACTORY_CLASS_KEY, DEFAULT_HIVE_SOURCE_WATERMARKER_FACTORY_CLASS), (Object[])new Object[0])).createFromState((State)state);
        EventSubmitter.submit((Optional)Optional.of((Object)this.eventSubmitter), (String)"gobblin.hive.conversion.Setup");
        this.datasetFinder = (IterableDatasetFinder)GobblinConstructorUtils.invokeConstructor(HiveDatasetFinder.class, (String)state.getProp(HIVE_SOURCE_DATASET_FINDER_CLASS_KEY, DEFAULT_HIVE_SOURCE_DATASET_FINDER_CLASS), (Object[])new Object[]{HiveSource.getSourceFs((State)state), state.getProperties(), this.eventSubmitter});
        int maxLookBackDays = state.getPropAsInt(HIVE_SOURCE_MAXIMUM_LOOKBACK_DAYS_KEY, 3);
        this.maxLookBackTime = new DateTime().minusDays(maxLookBackDays).getMillis();
        this.ignoreDataPathIdentifierList = COMMA_BASED_SPLITTER.splitToList((CharSequence)state.getProp(HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER_KEY, DEFAULT_HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER));
        this.silenceHiveLoggers();
    }

    protected void createWorkunitForNonPartitionedTable(HiveDataset hiveDataset) throws IOException {
        try {
            long tableProcessTime = new DateTime().getMillis();
            long updateTime = this.updateProvider.getUpdateTime(hiveDataset.getTable());
            this.watermarker.onTableProcessBegin(hiveDataset.getTable(), tableProcessTime);
            LongWatermark lowWatermark = this.watermarker.getPreviousHighWatermark(hiveDataset.getTable());
            if (!this.shouldCreateWorkUnit(hiveDataset.getTable().getPath())) {
                log.info(String.format("Not creating workunit for table %s as partition path %s contains data path tokens to ignore %s", hiveDataset.getTable().getCompleteName(), hiveDataset.getTable().getPath(), this.ignoreDataPathIdentifierList));
                return;
            }
            if (this.shouldCreateWorkunit(hiveDataset.getTable(), lowWatermark)) {
                log.info(String.format("Creating workunit for table %s as updateTime %s or createTime %s is greater than low watermark %s", hiveDataset.getTable().getCompleteName(), updateTime, hiveDataset.getTable().getTTable().getCreateTime(), lowWatermark.getValue()));
                HiveWorkUnit hiveWorkUnit = this.workUnitForTable(hiveDataset);
                LongWatermark expectedDatasetHighWatermark = this.watermarker.getExpectedHighWatermark(hiveDataset.getTable(), tableProcessTime);
                hiveWorkUnit.setWatermarkInterval(new WatermarkInterval((Watermark)lowWatermark, (Watermark)expectedDatasetHighWatermark));
                EventWorkunitUtils.setTableSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), updateTime, lowWatermark.getValue(), this.beginGetWorkunitsTime);
                this.workunits.add(hiveWorkUnit);
                log.debug(String.format("Workunit added for table: %s", new Object[]{hiveWorkUnit}));
            } else {
                log.info(String.format("Not creating workunit for table %s as updateTime %s and createTime %s is not greater than low watermark %s", hiveDataset.getTable().getCompleteName(), updateTime, hiveDataset.getTable().getTTable().getCreateTime(), lowWatermark.getValue()));
            }
        }
        catch (UpdateNotFoundException e) {
            log.error(String.format("Not Creating workunit for %s as update time was not found. %s", hiveDataset.getTable().getCompleteName(), e.getMessage()), (Throwable)e);
        }
        catch (SchemaNotFoundException e) {
            log.error(String.format("Not Creating workunit for %s as schema was not found. %s", hiveDataset.getTable().getCompleteName(), e.getMessage()), (Throwable)e);
        }
    }

    protected HiveWorkUnit workUnitForTable(HiveDataset hiveDataset) throws IOException {
        HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset);
        if (this.isAvro(hiveDataset.getTable())) {
            hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable()));
        }
        return hiveWorkUnit;
    }

    protected void createWorkunitsForPartitionedTable(HiveDataset hiveDataset, AutoReturnableObject<IMetaStoreClient> client) throws IOException {
        long tableProcessTime = new DateTime().getMillis();
        this.watermarker.onTableProcessBegin(hiveDataset.getTable(), tableProcessTime);
        Optional partitionFilter = Optional.absent();
        if (hiveDataset.getProperties().containsKey("hive.dataset.partition.filter.datetime.column") && hiveDataset.getProperties().containsKey("hive.dataset.partition.filter.datetime.format") && hiveDataset.getProperties().containsKey("hive.dataset.partition.filter.datetime.lookback")) {
            partitionFilter = Optional.of((Object)new LookbackPartitionFilterGenerator(hiveDataset.getProperties()).getFilter(hiveDataset));
            log.info(String.format("Getting partitions for %s using partition filter %s", hiveDataset.getTable().getCompleteName(), partitionFilter.get()));
        }
        List<Partition> sourcePartitions = HiveUtils.getPartitions((IMetaStoreClient)client.get(), hiveDataset.getTable(), (Optional<String>)partitionFilter);
        for (Partition sourcePartition : sourcePartitions) {
            if (this.isOlderThanLookback(sourcePartition)) continue;
            LongWatermark lowWatermark = this.watermarker.getPreviousHighWatermark(sourcePartition);
            try {
                if (!this.shouldCreateWorkUnit(new Path(sourcePartition.getLocation()))) {
                    log.info(String.format("Not creating workunit for partition %s as partition path %s contains data path tokens to ignore %s", sourcePartition.getCompleteName(), sourcePartition.getLocation(), this.ignoreDataPathIdentifierList));
                    continue;
                }
                long updateTime = this.updateProvider.getUpdateTime(sourcePartition);
                if (this.shouldCreateWorkunit(sourcePartition, lowWatermark)) {
                    log.debug(String.format("Processing partition: %s", sourcePartition));
                    long partitionProcessTime = new DateTime().getMillis();
                    this.watermarker.onPartitionProcessBegin(sourcePartition, partitionProcessTime, updateTime);
                    LongWatermark expectedPartitionHighWatermark = this.watermarker.getExpectedHighWatermark(sourcePartition, tableProcessTime, partitionProcessTime);
                    HiveWorkUnit hiveWorkUnit = this.workUnitForPartition(hiveDataset, sourcePartition);
                    hiveWorkUnit.setWatermarkInterval(new WatermarkInterval((Watermark)lowWatermark, (Watermark)expectedPartitionHighWatermark));
                    EventWorkunitUtils.setPartitionSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), sourcePartition, updateTime, lowWatermark.getValue(), this.beginGetWorkunitsTime);
                    this.workunits.add(hiveWorkUnit);
                    log.info(String.format("Creating workunit for partition %s as updateTime %s is greater than low watermark %s", sourcePartition.getCompleteName(), updateTime, lowWatermark.getValue()));
                    continue;
                }
                log.info(String.format("Not creating workunit for partition %s as updateTime %s is lesser than low watermark %s", sourcePartition.getCompleteName(), updateTime, lowWatermark.getValue()));
            }
            catch (UpdateNotFoundException e) {
                log.error(String.format("Not creating workunit for %s as update time was not found. %s", sourcePartition.getCompleteName(), e.getMessage()));
            }
            catch (SchemaNotFoundException e) {
                log.error(String.format("Not creating workunit for %s as schema was not found. %s", sourcePartition.getCompleteName(), e.getMessage()));
            }
            catch (UncheckedExecutionException e) {
                log.error(String.format("Not creating workunit for %s because an unchecked exception occurred. %s", sourcePartition.getCompleteName(), e.getMessage()));
            }
        }
    }

    protected HiveWorkUnit workUnitForPartition(HiveDataset hiveDataset, Partition partition) throws IOException {
        HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset, partition);
        if (this.isAvro(hiveDataset.getTable())) {
            hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable()));
            hiveWorkUnit.setPartitionSchemaUrl(this.avroSchemaManager.getSchemaUrl(partition));
        }
        return hiveWorkUnit;
    }

    protected boolean shouldCreateWorkUnit(Path dataLocation) {
        if (null == this.ignoreDataPathIdentifierList || this.ignoreDataPathIdentifierList.size() == 0) {
            return true;
        }
        for (String pathToken : this.ignoreDataPathIdentifierList) {
            if (!dataLocation.toString().toLowerCase().contains(pathToken.toLowerCase())) continue;
            return false;
        }
        return true;
    }

    protected boolean shouldCreateWorkunit(Partition sourcePartition, LongWatermark lowWatermark) throws UpdateNotFoundException {
        long updateTime = this.updateProvider.getUpdateTime(sourcePartition);
        long createTime = HiveSource.getCreateTime(sourcePartition);
        return this.shouldCreateWorkunit(createTime, updateTime, lowWatermark);
    }

    protected boolean shouldCreateWorkunit(Table table, LongWatermark lowWatermark) throws UpdateNotFoundException {
        long updateTime = this.updateProvider.getUpdateTime(table);
        long createTime = HiveSource.getCreateTime(table);
        return this.shouldCreateWorkunit(createTime, updateTime, lowWatermark);
    }

    protected boolean shouldCreateWorkunit(long createTime, long updateTime, LongWatermark lowWatermark) {
        if (new DateTime(updateTime).isBefore(this.maxLookBackTime)) {
            return false;
        }
        return new DateTime(updateTime).isAfter(lowWatermark.getValue());
    }

    @VisibleForTesting
    public boolean isOlderThanLookback(Partition partition) {
        return new DateTime(HiveSource.getCreateTime(partition)).isBefore(this.maxLookBackTime);
    }

    @VisibleForTesting
    public static long getCreateTime(Partition partition) {
        if (partition.getTPartition().getCreateTime() > 0) {
            return TimeUnit.MILLISECONDS.convert(partition.getTPartition().getCreateTime(), TimeUnit.SECONDS);
        }
        if (partition.getTPartition().isSetParameters() && partition.getTPartition().getParameters().containsKey(DISTCP_REGISTRATION_GENERATION_TIME_KEY)) {
            log.debug("Did not find createTime in Hive partition, used distcp registration generation time.");
            return Long.parseLong((String)partition.getTPartition().getParameters().get(DISTCP_REGISTRATION_GENERATION_TIME_KEY));
        }
        log.warn(String.format("Could not find create time for partition %s. Will return createTime as 0", partition.getCompleteName()));
        return 0L;
    }

    protected static long getCreateTime(Table table) {
        return TimeUnit.MILLISECONDS.convert(table.getTTable().getCreateTime(), TimeUnit.SECONDS);
    }

    public Extractor getExtractor(WorkUnitState state) throws IOException {
        try {
            return ((HiveBaseExtractorFactory)this.classAliasResolver.resolveClass(state.getProp(HIVE_SOURCE_EXTRACTOR_TYPE, DEFAULT_HIVE_SOURCE_EXTRACTOR_TYPE)).newInstance()).createExtractor(state, HiveSource.getSourceFs((State)state));
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    public void shutdown(SourceState state) {
    }

    public static FileSystem getSourceFs(State state) throws IOException {
        if (state.contains(HIVE_SOURCE_FS_URI)) {
            return FileSystem.get((URI)URI.create(state.getProp(HIVE_SOURCE_FS_URI)), (Configuration)HadoopUtils.getConfFromState((State)state));
        }
        return FileSystem.get((Configuration)HadoopUtils.getConfFromState((State)state));
    }

    private void silenceHiveLoggers() {
        ImmutableList loggers = ImmutableList.of((Object)"org.apache.hadoop.hive", (Object)"org.apache.hive", (Object)"hive.ql.parse");
        for (String name : loggers) {
            org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger((String)name);
            if (logger == null) continue;
            logger.setLevel(Level.WARN);
        }
    }

    private boolean isAvro(Table table) {
        return AvroSerDe.class.getName().equals(table.getSd().getSerdeInfo().getSerializationLib());
    }
}

