/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.batch.integration.chunk;

import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.listener.StepExecutionListenerSupport;
import org.springframework.batch.integration.chunk.AsynchronousFailureException;
import org.springframework.batch.integration.chunk.ChunkRequest;
import org.springframework.batch.integration.chunk.ChunkResponse;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemWriter;
import org.springframework.integration.gateway.MessagingGateway;
import org.springframework.util.Assert;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class ChunkMessageChannelItemWriter<T>
extends StepExecutionListenerSupport
implements ItemWriter<T>,
ItemStream {
    private static final Log logger = LogFactory.getLog(ChunkMessageChannelItemWriter.class);
    static final String ACTUAL = ChunkMessageChannelItemWriter.class.getName() + ".ACTUAL";
    static final String EXPECTED = ChunkMessageChannelItemWriter.class.getName() + ".EXPECTED";
    private static final long DEFAULT_THROTTLE_LIMIT = 6L;
    private MessagingGateway messagingGateway;
    private LocalState localState = new LocalState();
    private long throttleLimit = 6L;

    public void setThrottleLimit(long throttleLimit) {
        this.throttleLimit = throttleLimit;
    }

    public void setMessagingGateway(MessagingGateway messagingGateway) {
        this.messagingGateway = messagingGateway;
    }

    public void write(List<? extends T> items) throws Exception {
        while (this.localState.getExpecting() > this.throttleLimit) {
            this.getNextResult();
        }
        if (!items.isEmpty()) {
            logger.debug((Object)("Dispatching chunk: " + items));
            ChunkRequest<? extends T> request = new ChunkRequest<T>(items, this.localState.getJobId(), this.localState.createStepContribution());
            this.messagingGateway.send(request);
            this.localState.expected++;
        }
    }

    public void beforeStep(StepExecution stepExecution) {
        this.localState.setStepExecution(stepExecution);
    }

    public ExitStatus afterStep(StepExecution stepExecution) {
        boolean timedOut;
        if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
            return ExitStatus.EXECUTING;
        }
        long expecting = this.localState.getExpecting();
        try {
            logger.debug((Object)"Waiting for results in step listener...");
            timedOut = !this.waitForResults();
            logger.debug((Object)"Finished waiting for results in step listener.");
        }
        catch (RuntimeException e) {
            logger.debug((Object)"Detected failure waiting for results in step listener.", (Throwable)e);
            stepExecution.setStatus(BatchStatus.FAILED);
            return ExitStatus.FAILED.addExitDescription(e.getClass().getName() + ": " + e.getMessage());
        }
        if (timedOut) {
            stepExecution.setStatus(BatchStatus.FAILED);
            throw new ItemStreamException("Timed out waiting for back log at end of step");
        }
        return ExitStatus.COMPLETED.addExitDescription("Waited for " + expecting + " results.");
    }

    public void close() throws ItemStreamException {
        this.localState.reset();
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(EXPECTED)) {
            this.localState.expected = executionContext.getLong(ChunkMessageChannelItemWriter.EXPECTED);
            this.localState.actual = executionContext.getLong(ChunkMessageChannelItemWriter.ACTUAL);
            if (!this.waitForResults()) {
                throw new ItemStreamException("Timed out waiting for back log on open");
            }
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(EXPECTED, this.localState.expected);
        executionContext.putLong(ACTUAL, this.localState.actual);
    }

    private boolean waitForResults() {
        int count = 0;
        int maxCount = 40;
        while (this.localState.getExpecting() > 0L && count++ < maxCount) {
            this.getNextResult();
        }
        return count < maxCount;
    }

    private void getNextResult() {
        ChunkResponse payload = (ChunkResponse)this.messagingGateway.receive();
        if (payload != null) {
            Long jobInstanceId = payload.getJobId();
            Assert.state((jobInstanceId != null ? 1 : 0) != 0, (String)"Message did not contain job instance id.");
            Assert.state((boolean)jobInstanceId.equals(this.localState.getJobId()), (String)("Message contained wrong job instance id [" + jobInstanceId + "] should have been [" + this.localState.getJobId() + "]."));
            this.localState.actual++;
            if (!payload.isSuccessful()) {
                throw new AsynchronousFailureException("Failure or interrupt detected in handler: " + payload.getMessage());
            }
        }
    }

    private static class LocalState {
        private long actual;
        private long expected;
        private StepExecution stepExecution;

        private LocalState() {
        }

        public long getExpecting() {
            return this.expected - this.actual;
        }

        public StepContribution createStepContribution() {
            return this.stepExecution.createStepContribution();
        }

        public Long getJobId() {
            return this.stepExecution.getJobExecution().getJobId();
        }

        public void setStepExecution(StepExecution stepExecution) {
            this.stepExecution = stepExecution;
        }

        public void reset() {
            this.actual = 0L;
            this.expected = 0L;
        }
    }
}

