/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.typesafe.config.Config;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
import org.apache.gobblin.broker.iface.ScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.CommitSequenceStore;
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.metastore.JobHistoryStore;
import org.apache.gobblin.metastore.MetaStoreModule;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.runtime.CombinedWorkUnitAndDatasetStateGenerator;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.NewTaskCompletionEvent;
import org.apache.gobblin.runtime.SafeDatasetCommit;
import org.apache.gobblin.runtime.SourceDecorator;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.commit.FsCommitSequenceStore;
import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobContext
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(JobContext.class);
    private static final String TASK_STAGING_DIR_NAME = "task-staging";
    private static final String TASK_OUTPUT_DIR_NAME = "task-output";
    private final String jobName;
    private final String jobId;
    private final String jobSequence;
    private final JobState jobState;
    private final JobCommitPolicy jobCommitPolicy;
    private final Optional<JobMetrics> jobMetricsOptional;
    private final Source<?, ?> source;
    private final DatasetStateStore datasetStateStore;
    private final Optional<JobHistoryStore> jobHistoryStoreOptional;
    private final boolean parallelizeCommit;
    private final int parallelCommits;
    protected final Boolean stagingDirProvided;
    protected final Boolean outputDirProvided;
    private final DeliverySemantics semantics;
    private final Optional<CommitSequenceStore> commitSequenceStore;
    private final Logger logger;
    private final SharedResourcesBroker<GobblinScopeTypes> jobBroker;
    private Optional<Map<String, JobState.DatasetState>> datasetStatesByUrns = Optional.absent();

    public JobContext(Properties jobProps, Logger logger, SharedResourcesBroker<GobblinScopeTypes> instanceBroker) throws Exception {
        Preconditions.checkArgument((boolean)jobProps.containsKey("job.name"), (Object)"A job must have a job name specified by job.name");
        this.jobName = JobState.getJobNameFromProps(jobProps);
        this.jobId = jobProps.containsKey("job.id") ? jobProps.getProperty("job.id") : JobLauncherUtils.newJobId((String)this.jobName);
        this.jobSequence = Long.toString(Id.Job.parse((String)this.jobId).getSequence());
        jobProps.setProperty("job.id", this.jobId);
        this.jobBroker = instanceBroker.newSubscopedBuilder((ScopeInstance)new JobScopeInstance(this.jobName, this.jobId)).withOverridingConfig(ConfigUtils.propertiesToConfig((Properties)jobProps)).build();
        this.jobCommitPolicy = JobCommitPolicy.getCommitPolicy((Properties)jobProps);
        this.datasetStateStore = this.createStateStore(ConfigUtils.propertiesToConfig((Properties)jobProps));
        this.jobHistoryStoreOptional = this.createJobHistoryStore(jobProps);
        State jobPropsState = new State();
        jobPropsState.addAll(jobProps);
        this.jobState = new JobState(jobPropsState, this.jobName, this.jobId);
        this.jobState.setBroker(this.jobBroker);
        this.jobState.setWorkUnitAndDatasetStateFunctional(new CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName));
        this.stagingDirProvided = this.jobState.contains("writer.staging.dir");
        this.outputDirProvided = this.jobState.contains("writer.output.dir");
        this.setTaskStagingAndOutputDirs();
        if (GobblinMetrics.isEnabled((Properties)jobProps)) {
            this.jobMetricsOptional = Optional.of((Object)((Object)JobMetrics.get(this.jobState)));
            this.jobState.setProp("metrics.context.name", ((JobMetrics)((Object)this.jobMetricsOptional.get())).getName());
        } else {
            this.jobMetricsOptional = Optional.absent();
        }
        this.semantics = DeliverySemantics.parse((State)this.jobState);
        this.commitSequenceStore = this.createCommitSequenceStore();
        this.source = this.createSource(jobProps);
        this.logger = logger;
        this.parallelizeCommit = this.jobState.getPropAsBoolean("job.commit.parallelize", false);
        this.parallelCommits = this.parallelizeCommit ? this.jobState.getPropAsInt("job.commit.parallelCommits", 20) : 1;
    }

    protected DatasetStateStore createStateStore(Config jobConfig) throws IOException {
        boolean stateStoreEnabled = !jobConfig.hasPath("state.store.enabled") || jobConfig.getBoolean("state.store.enabled");
        String stateStoreType = !stateStoreEnabled ? "noop" : ConfigUtils.getString((Config)jobConfig, (String)"dataset.state.store.type", (String)ConfigUtils.getString((Config)jobConfig, (String)"state.store.type", (String)"fs"));
        ClassAliasResolver resolver = new ClassAliasResolver(DatasetStateStore.Factory.class);
        try {
            DatasetStateStore.Factory stateStoreFactory = (DatasetStateStore.Factory)resolver.resolveClass(stateStoreType).newInstance();
            return stateStoreFactory.createStateStore(jobConfig);
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    protected Optional<JobHistoryStore> createJobHistoryStore(Properties jobProps) {
        boolean jobHistoryStoreEnabled = Boolean.valueOf(jobProps.getProperty("job.history.store.enabled", Boolean.FALSE.toString()));
        if (jobHistoryStoreEnabled) {
            Injector injector = Guice.createInjector((Module[])new Module[]{new MetaStoreModule(jobProps)});
            return Optional.of((Object)injector.getInstance(JobHistoryStore.class));
        }
        return Optional.absent();
    }

    protected Optional<CommitSequenceStore> createCommitSequenceStore() throws IOException {
        if (this.semantics != DeliverySemantics.EXACTLY_ONCE) {
            return Optional.absent();
        }
        Preconditions.checkState((boolean)this.jobState.contains("gobblin.runtime.commit.sequence.store.dir"));
        try (FileSystem fs = FileSystem.get((URI)URI.create(this.jobState.getProp("gobblin.runtime.commit.sequence.store.fs.uri", "file:///")), (Configuration)HadoopUtils.getConfFromState((State)this.jobState));){
            Optional optional = Optional.of((Object)new FsCommitSequenceStore(fs, new Path(this.jobState.getProp("gobblin.runtime.commit.sequence.store.dir"))));
            return optional;
        }
    }

    protected Source<?, ?> createSource(Properties jobProps) throws ClassNotFoundException, InstantiationException, IllegalAccessException {
        return new SourceDecorator((Source)Source.class.cast(Class.forName(jobProps.getProperty("source.class")).newInstance()), this.jobId, this.logger);
    }

    public String getJobName() {
        return this.jobName;
    }

    public String getJobId() {
        return this.jobId;
    }

    public String getJobKey() {
        return this.jobSequence;
    }

    public JobState getJobState() {
        return this.jobState;
    }

    public Optional<JobMetrics> getJobMetricsOptional() {
        return this.jobMetricsOptional;
    }

    Source<?, ?> getSource() {
        return this.source;
    }

    protected static Path getJobDir(String dir1, String dir2) {
        return new Path(dir1, dir2);
    }

    protected void setTaskStagingAndOutputDirs() {
        if (this.jobState.contains("writer.staging.dir")) {
            String writerStagingDirWithJobId = new Path(JobContext.getJobDir(this.jobState.getProp("writer.staging.dir"), this.getJobName()), this.jobId).toString();
            this.jobState.setProp("writer.staging.dir", writerStagingDirWithJobId);
        }
        if (this.jobState.contains("writer.output.dir")) {
            String writerOutputDirWithJobId = new Path(JobContext.getJobDir(this.jobState.getProp("writer.output.dir"), this.getJobName()), this.jobId).toString();
            this.jobState.setProp("writer.output.dir", writerOutputDirWithJobId);
        }
        if (this.jobState.contains("task.data.root.dir")) {
            String taskDataRootDirWithJobId = new Path(JobContext.getJobDir(this.jobState.getProp("task.data.root.dir"), this.getJobName()), this.jobId).toString();
            this.jobState.setProp("task.data.root.dir", taskDataRootDirWithJobId);
            this.setTaskStagingDir();
            this.setTaskOutputDir();
        } else {
            LOG.warn("Property task.data.root.dir is missing.");
        }
    }

    private void setTaskStagingDir() {
        if (this.jobState.contains("writer.staging.dir")) {
            LOG.warn(String.format("Property %s is deprecated. No need to use it if %s is specified.", "writer.staging.dir", "task.data.root.dir"));
        } else {
            String workingDir = this.jobState.getProp("task.data.root.dir");
            this.jobState.setProp("writer.staging.dir", new Path(workingDir, TASK_STAGING_DIR_NAME).toString());
            LOG.info(String.format("Writer Staging Directory is set to %s.", this.jobState.getProp("writer.staging.dir")));
        }
    }

    private void setTaskOutputDir() {
        if (this.jobState.contains("writer.output.dir")) {
            LOG.warn(String.format("Property %s is deprecated. No need to use it if %s is specified.", "writer.output.dir", "task.data.root.dir"));
        } else {
            String workingDir = this.jobState.getProp("task.data.root.dir");
            this.jobState.setProp("writer.output.dir", new Path(workingDir, TASK_OUTPUT_DIR_NAME).toString());
            LOG.info(String.format("Writer Output Directory is set to %s.", this.jobState.getProp("writer.output.dir")));
        }
    }

    boolean shouldCleanupStagingDataPerTask() {
        return this.jobState.getPropAsBoolean("cleanup.staging.data.per.task", true);
    }

    Map<String, JobState.DatasetState> getDatasetStatesByUrns() {
        return ImmutableMap.copyOf((Map)((Map)this.datasetStatesByUrns.or((Object)Maps.newHashMap())));
    }

    void storeJobExecutionInfo() {
        if (this.jobHistoryStoreOptional.isPresent()) {
            try {
                this.logger.info("Writing job execution information to the job history store");
                ((JobHistoryStore)this.jobHistoryStoreOptional.get()).put(this.jobState.toJobExecutionInfo());
            }
            catch (IOException ioe) {
                this.logger.error("Failed to write job execution information to the job history store: " + ioe, (Throwable)ioe);
            }
        }
    }

    @Subscribe
    public void handleNewTaskCompletionEvent(NewTaskCompletionEvent newOutputTaskStateEvent) {
        LOG.info("{} more tasks of job {} have completed", (Object)newOutputTaskStateEvent.getTaskStates().size(), (Object)this.jobId);
        this.storeJobExecutionInfo();
    }

    void finalizeJobStateBeforeCommit() {
        this.jobState.setEndTime(System.currentTimeMillis());
        this.jobState.setDuration(this.jobState.getEndTime() - this.jobState.getStartTime());
        for (TaskState taskState : this.jobState.getTaskStates()) {
            this.jobState.setProp("fork.branches", taskState.getPropAsInt("fork.branches", 1));
        }
    }

    void commit() throws IOException {
        this.commit(false);
    }

    void commit(final boolean isJobCancelled) throws IOException {
        this.datasetStatesByUrns = Optional.of(this.computeDatasetStatesByUrns());
        final boolean shouldCommitDataInJob = JobContext.shouldCommitDataInJob((State)this.jobState);
        final DeliverySemantics deliverySemantics = DeliverySemantics.parse((State)this.jobState);
        final int numCommitThreads = this.numCommitThreads();
        if (!shouldCommitDataInJob) {
            this.logger.info("Job will not commit data since data are committed by tasks.");
        }
        try {
            if (this.datasetStatesByUrns.isPresent()) {
                this.logger.info("Persisting dataset urns.");
                this.datasetStateStore.persistDatasetURNs(this.jobName, ((Map)this.datasetStatesByUrns.get()).keySet());
            }
            List result = new IteratorExecutor(Iterables.transform(((Map)this.datasetStatesByUrns.get()).entrySet(), (Function)new Function<Map.Entry<String, JobState.DatasetState>, Callable<Void>>(){

                @Nullable
                public Callable<Void> apply(Map.Entry<String, JobState.DatasetState> entry) {
                    return JobContext.this.createSafeDatasetCommit(shouldCommitDataInJob, isJobCancelled, deliverySemantics, entry.getKey(), entry.getValue(), numCommitThreads > 1, JobContext.this);
                }
            }).iterator(), numCommitThreads, ExecutorsUtils.newThreadFactory((Optional)Optional.of((Object)this.logger), (Optional)Optional.of((Object)"Commit-thread-%d"))).executeAndGetResults();
            IteratorExecutor.logFailures((List)result, (Logger)LOG, (int)10);
            if (!IteratorExecutor.verifyAllSuccessful((List)result)) {
                this.jobState.setState(JobState.RunningState.FAILED);
                throw new IOException("Failed to commit dataset state for some dataset(s) of job " + this.jobId);
            }
        }
        catch (InterruptedException exc) {
            throw new IOException(exc);
        }
        this.jobState.setState(JobState.RunningState.COMMITTED);
    }

    @Override
    public void close() throws IOException {
        this.jobBroker.close();
    }

    private int numCommitThreads() {
        return this.parallelCommits;
    }

    @VisibleForTesting
    protected Callable<Void> createSafeDatasetCommit(boolean shouldCommitDataInJob, boolean isJobCancelled, DeliverySemantics deliverySemantics, String datasetUrn, JobState.DatasetState datasetState, boolean isMultithreaded, JobContext jobContext) {
        return new SafeDatasetCommit(shouldCommitDataInJob, isJobCancelled, deliverySemantics, datasetUrn, datasetState, isMultithreaded, jobContext);
    }

    protected Map<String, JobState.DatasetState> computeDatasetStatesByUrns() {
        return this.jobState.createDatasetStatesByUrns();
    }

    public static Optional<Class<? extends DataPublisher>> getJobDataPublisherClass(State state) throws ReflectiveOperationException {
        if (!Strings.isNullOrEmpty((String)state.getProp("data.publisher.job.type"))) {
            return Optional.of(Class.forName(state.getProp("data.publisher.job.type")));
        }
        if (!Strings.isNullOrEmpty((String)state.getProp("data.publisher.type"))) {
            return Optional.of(Class.forName(state.getProp("data.publisher.type")));
        }
        LOG.info("Property data.publisher.job.type not specified");
        return Optional.absent();
    }

    private static boolean shouldCommitDataInJob(State state) {
        boolean jobCommitPolicyIsFull = JobCommitPolicy.getCommitPolicy((Properties)state.getProperties()) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
        boolean publishDataAtJobLevel = state.getPropAsBoolean("publish.data.at.job.level", true);
        boolean jobDataPublisherSpecified = !Strings.isNullOrEmpty((String)state.getProp("data.publisher.job.type"));
        return jobCommitPolicyIsFull || publishDataAtJobLevel || jobDataPublisherSpecified;
    }

    public String toString() {
        return Objects.toStringHelper((String)JobContext.class.getSimpleName()).add("jobName", (Object)this.getJobName()).add("jobId", (Object)this.getJobId()).add("jobState", (Object)this.getJobState()).toString();
    }

    JobCommitPolicy getJobCommitPolicy() {
        return this.jobCommitPolicy;
    }

    DatasetStateStore getDatasetStateStore() {
        return this.datasetStateStore;
    }

    public Boolean getStagingDirProvided() {
        return this.stagingDirProvided;
    }

    public Boolean getOutputDirProvided() {
        return this.outputDirProvided;
    }

    public DeliverySemantics getSemantics() {
        return this.semantics;
    }

    public Optional<CommitSequenceStore> getCommitSequenceStore() {
        return this.commitSequenceStore;
    }

    public SharedResourcesBroker<GobblinScopeTypes> getJobBroker() {
        return this.jobBroker;
    }
}

