package org.springframework.xd.dirt.batch.tasklet;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Date;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.UnexpectedJobExecutionException;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.xd.dirt.integration.bus.BusUtils;
import org.springframework.xd.dirt.integration.bus.MessageBus;
import org.springframework.xd.dirt.plugins.job.ExpandedJobParametersConverter;
import org.springframework.xd.dirt.stream.JobDefinition;
import org.springframework.xd.dirt.stream.JobDefinitionRepository;
import org.springframework.xd.dirt.stream.NoSuchDefinitionException;
import org.springframework.xd.dirt.stream.NotDeployedException;
import org.springframework.xd.store.DomainRepository;

/* loaded from: input_file:org/springframework/xd/dirt/batch/tasklet/JobLaunchingTasklet.class */
public class JobLaunchingTasklet implements Tasklet {
    private final Logger logger;
    public static final String XD_ORCHESTRATION_ID = "xd_orchestration_id";
    public static final String XD_PARENT_JOB_EXECUTION_ID = "xd_parent_execution_id";
    private long timeout;
    private String jobName;
    private MessageBus messageBus;
    private JobDefinitionRepository definitionRepository;
    private DomainRepository<JobDefinition, String> instanceRepository;
    private String orchestrationId;
    private MessageChannel launchingChannel;
    private PollableChannel listeningChannel;

    public JobLaunchingTasklet(MessageBus messageBus, JobDefinitionRepository jobDefinitionRepository, DomainRepository<JobDefinition, String> domainRepository, String str, Long l) {
        this(messageBus, jobDefinitionRepository, domainRepository, str, l, createLaunchingChannel(str), createListeningChannel(str));
    }

    protected JobLaunchingTasklet(MessageBus messageBus, JobDefinitionRepository jobDefinitionRepository, DomainRepository<JobDefinition, String> domainRepository, String str, Long l, MessageChannel messageChannel, PollableChannel pollableChannel) {
        this.logger = LoggerFactory.getLogger(JobLaunchingTasklet.class);
        Assert.notNull(messageBus, "A message bus is required");
        Assert.notNull(jobDefinitionRepository, "A JobDefinitionRepository is required");
        Assert.notNull(domainRepository, "A DomainRepository is required");
        Assert.notNull(str, "A job name is required");
        this.jobName = str;
        this.messageBus = messageBus;
        this.definitionRepository = jobDefinitionRepository;
        this.instanceRepository = domainRepository;
        this.launchingChannel = messageChannel;
        this.listeningChannel = pollableChannel;
        this.timeout = l == null ? -1L : l.longValue();
    }

    private static DirectChannel createLaunchingChannel(String str) {
        DirectChannel directChannel = new DirectChannel();
        directChannel.setBeanName(str + ":launcher");
        return directChannel;
    }

    private static QueueChannel createListeningChannel(String str) {
        QueueChannel queueChannel = new QueueChannel();
        queueChannel.setBeanName(str + ":resultListener");
        return queueChannel;
    }

    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
        setOrchestrationId(chunkContext);
        String jobName = chunkContext.getStepContext().getStepExecution().getJobExecution().getJobInstance().getJobName();
        bindChannels(jobName);
        try {
            validateJobDeployment();
            String jobParameters = getJobParameters(chunkContext);
            this.logger.debug("Launching request for {} orchestration {}", this.jobName, this.orchestrationId);
            this.launchingChannel.send(MessageBuilder.withPayload(jobParameters).build());
            Date date = new Date();
            long j = this.timeout;
            JobExecution jobExecution = null;
            while (jobExecution == null && (this.timeout <= 0 || j > 0)) {
                Message<?> receive = this.timeout > 0 ? this.listeningChannel.receive(j) : this.listeningChannel.receive();
                if (receive != null) {
                    jobExecution = getResult(receive);
                }
                j = (date.getTime() - System.currentTimeMillis()) + this.timeout;
            }
            if (jobExecution == null) {
                throw new UnexpectedJobExecutionException("The job timed out while waiting for a result");
            }
            processResult(stepContribution, chunkContext, jobExecution);
            this.logger.debug("Completed processing for {} orchestration {}", this.jobName, this.orchestrationId);
            RepeatStatus repeatStatus = RepeatStatus.FINISHED;
            unbindChannels(jobName);
            return repeatStatus;
        } catch (Throwable th) {
            unbindChannels(jobName);
            throw th;
        }
    }

    private String getJobParameters(ChunkContext chunkContext) throws JsonProcessingException {
        StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
        Properties properties = stepExecution.getJobParameters().toProperties();
        properties.remove(ExpandedJobParametersConverter.UNIQUE_JOB_PARAMETER_KEY);
        String str = null;
        Map stepExecutionContext = chunkContext.getStepContext().getStepExecutionContext();
        if (stepExecutionContext.containsKey(ExpandedJobParametersConverter.UNIQUE_JOB_PARAMETER_KEY)) {
            str = (String) stepExecutionContext.get(ExpandedJobParametersConverter.UNIQUE_JOB_PARAMETER_KEY);
        }
        properties.put(XD_ORCHESTRATION_ID, this.orchestrationId);
        properties.put("-xd_parent_execution_id", stepExecution.getJobExecutionId());
        if (str != null) {
            properties.put(ExpandedJobParametersConverter.IS_RESTART_JOB_PARAMETER_KEY, true);
            properties.put(ExpandedJobParametersConverter.UNIQUE_JOB_PARAMETER_KEY, str);
        }
        return new ObjectMapper().writeValueAsString(properties);
    }

    private void processResult(StepContribution stepContribution, ChunkContext chunkContext, JobExecution jobExecution) {
        stepContribution.setExitStatus(jobExecution.getExitStatus());
        if (jobExecution.getStatus().isUnsuccessful()) {
            chunkContext.getStepContext().getStepExecution().getExecutionContext().put(ExpandedJobParametersConverter.UNIQUE_JOB_PARAMETER_KEY, jobExecution.getJobParameters().getString(ExpandedJobParametersConverter.UNIQUE_JOB_PARAMETER_KEY));
            throw new UnexpectedJobExecutionException(String.format("Step failure: %s failed.", this.jobName));
        }
    }

    private void bindChannels(String str) {
        this.messageBus.bindPubSubConsumer(getEventListenerChannelName(this.jobName, str), this.listeningChannel, (Properties) null);
        this.messageBus.bindProducer("job:" + this.jobName, this.launchingChannel, (Properties) null);
    }

    private void unbindChannels(String str) {
        this.messageBus.unbindConsumer(getEventListenerChannelName(this.jobName, str), this.listeningChannel);
        this.messageBus.unbindProducer("job:" + this.jobName, this.launchingChannel);
    }

    private void validateJobDeployment() {
        if (((JobDefinition) this.definitionRepository.findOne(this.jobName)) == null) {
            throw new NoSuchDefinitionException(this.jobName, String.format("There is no %s definition named '%%s'", "job"));
        }
        if (this.instanceRepository.findOne(this.jobName) == null) {
            throw new NotDeployedException(this.jobName, String.format("The %s named '%%s' is not currently deployed", "job"));
        }
    }

    private void setOrchestrationId(ChunkContext chunkContext) {
        this.orchestrationId = String.valueOf(chunkContext.getStepContext().getStepExecution().getJobExecution().getJobInstance().getInstanceId());
        ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
        if (executionContext.containsKey(XD_ORCHESTRATION_ID)) {
            this.orchestrationId = (String) executionContext.get(XD_ORCHESTRATION_ID);
        } else {
            executionContext.put(XD_ORCHESTRATION_ID, this.orchestrationId);
        }
    }

    private String getEventListenerChannelName(String str, String str2) {
        String format = String.format("tap:job:%s.job", str);
        if (this.messageBus.isCapable(MessageBus.Capability.DURABLE_PUBSUB)) {
            format = BusUtils.addGroupToPubSub(str2, format);
        }
        return format;
    }

    public JobExecution getResult(Message<?> message) throws MessagingException {
        JobExecution jobExecution = (JobExecution) message.getPayload();
        String string = jobExecution.getJobParameters().getString(XD_ORCHESTRATION_ID);
        this.logger.debug("Received result for {} orchestration {}", this.jobName, string);
        if (StringUtils.hasText(string) && string.equalsIgnoreCase(this.orchestrationId) && !jobExecution.isRunning()) {
            return jobExecution;
        }
        return null;
    }
}
