/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.batch.integration.chunk;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.listener.StepExecutionListenerSupport;
import org.springframework.batch.integration.chunk.AsynchronousFailureException;
import org.springframework.batch.integration.chunk.ChunkRequest;
import org.springframework.batch.integration.chunk.ChunkResponse;
import org.springframework.batch.integration.chunk.StepContributionSource;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemWriter;
import org.springframework.integration.Message;
import org.springframework.integration.core.MessagingOperations;
import org.springframework.integration.core.PollableChannel;
import org.springframework.integration.message.GenericMessage;
import org.springframework.util.Assert;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class ChunkMessageChannelItemWriter<T>
extends StepExecutionListenerSupport
implements ItemWriter<T>,
ItemStream,
StepContributionSource {
    private static final Log logger = LogFactory.getLog(ChunkMessageChannelItemWriter.class);
    static final String ACTUAL = ChunkMessageChannelItemWriter.class.getName() + ".ACTUAL";
    static final String EXPECTED = ChunkMessageChannelItemWriter.class.getName() + ".EXPECTED";
    private static final long DEFAULT_THROTTLE_LIMIT = 6L;
    private MessagingOperations messagingGateway;
    private LocalState localState = new LocalState();
    private long throttleLimit = 6L;
    private int DEFAULT_MAX_WAIT_TIMEOUTS;
    private int maxWaitTimeouts = this.DEFAULT_MAX_WAIT_TIMEOUTS = 40;
    private PollableChannel replyChannel;

    public void setMaxWaitTimeouts(int maxWaitTimeouts) {
        this.maxWaitTimeouts = maxWaitTimeouts;
    }

    public void setThrottleLimit(long throttleLimit) {
        this.throttleLimit = throttleLimit;
    }

    public void setMessagingOperations(MessagingOperations messagingGateway) {
        this.messagingGateway = messagingGateway;
    }

    public void setReplyChannel(PollableChannel replyChannel) {
        this.replyChannel = replyChannel;
    }

    public void write(List<? extends T> items) throws Exception {
        while ((long)this.localState.getExpecting() > this.throttleLimit) {
            this.getNextResult();
        }
        if (!items.isEmpty()) {
            ChunkRequest<? extends T> request = this.localState.getRequest(items);
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Dispatching chunk: " + request));
            }
            this.messagingGateway.send((Message)new GenericMessage(request));
            this.localState.incrementExpected();
        }
    }

    public void beforeStep(StepExecution stepExecution) {
        this.localState.setStepExecution(stepExecution);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ExitStatus afterStep(StepExecution stepExecution) {
        boolean timedOut;
        if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
            return ExitStatus.EXECUTING;
        }
        long expecting = this.localState.getExpecting();
        try {
            logger.debug((Object)"Waiting for results in step listener...");
            timedOut = !this.waitForResults();
            logger.debug((Object)"Finished waiting for results in step listener.");
        }
        catch (RuntimeException e) {
            logger.debug((Object)"Detected failure waiting for results in step listener.", (Throwable)e);
            stepExecution.setStatus(BatchStatus.FAILED);
            ExitStatus exitStatus = ExitStatus.FAILED.addExitDescription(e.getClass().getName() + ": " + e.getMessage());
            return exitStatus;
        }
        finally {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Finished waiting for results in step listener.  Still expecting: " + this.localState.getExpecting()));
            }
            for (StepContribution contribution : this.getStepContributions()) {
                stepExecution.apply(contribution);
            }
        }
        if (timedOut) {
            stepExecution.setStatus(BatchStatus.FAILED);
            return ExitStatus.FAILED.addExitDescription("Timed out waiting for " + this.localState.getExpecting() + " backlog at end of step");
        }
        return ExitStatus.COMPLETED.addExitDescription("Waited for " + expecting + " results.");
    }

    public void close() throws ItemStreamException {
        this.localState.reset();
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(EXPECTED)) {
            this.localState.open(executionContext.getInt(EXPECTED), executionContext.getInt(ACTUAL));
            if (!this.waitForResults()) {
                throw new ItemStreamException("Timed out waiting for back log on open");
            }
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putInt(EXPECTED, this.localState.expected.intValue());
        executionContext.putInt(ACTUAL, this.localState.actual.intValue());
    }

    @Override
    public Collection<StepContribution> getStepContributions() {
        ArrayList<StepContribution> contributions = new ArrayList<StepContribution>();
        for (ChunkResponse response : this.localState.pollChunkResponses()) {
            StepContribution contribution = response.getStepContribution();
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Applying: " + response));
            }
            contributions.add(contribution);
        }
        return contributions;
    }

    private boolean waitForResults() throws AsynchronousFailureException {
        int count = 0;
        int maxCount = this.maxWaitTimeouts;
        Throwable failure = null;
        logger.info((Object)("Waiting for " + this.localState.getExpecting() + " results"));
        while (this.localState.getExpecting() > 0 && count++ < maxCount) {
            try {
                this.getNextResult();
            }
            catch (Throwable t) {
                logger.error((Object)("Detected error in remote result. Trying to recover " + this.localState.getExpecting() + " outstanding results before completing."), t);
                failure = t;
            }
        }
        if (failure != null) {
            throw ChunkMessageChannelItemWriter.wrapIfNecessary(failure);
        }
        return count < maxCount;
    }

    private void getNextResult() throws AsynchronousFailureException {
        Message message = this.messagingGateway.receive(this.replyChannel);
        if (message != null) {
            Long jobInstanceId;
            ChunkResponse payload = (ChunkResponse)message.getPayload();
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Found result: " + payload));
            }
            Assert.state(((jobInstanceId = payload.getJobId()) != null ? 1 : 0) != 0, (String)"Message did not contain job instance id.");
            Assert.state((boolean)jobInstanceId.equals(this.localState.getJobId()), (String)("Message contained wrong job instance id [" + jobInstanceId + "] should have been [" + this.localState.getJobId() + "]."));
            if (payload.isRedelivered()) {
                logger.warn((Object)"Redelivered result detected, which may indicate stale state. In the best case, we just picked up a timed out message from a previous failed execution. In the worst case (and if this is not a restart), the step may now timeout.  In that case if you believe that all messages from workers have been sent, the business state is probably inconsistent, and the step will fail.");
                this.localState.incrementRedelivered();
            }
            this.localState.pushResponse(payload);
            this.localState.incrementActual();
            if (!payload.isSuccessful()) {
                throw new AsynchronousFailureException("Failure or interrupt detected in handler: " + payload.getMessage());
            }
        }
    }

    private static AsynchronousFailureException wrapIfNecessary(Throwable throwable) {
        if (throwable instanceof Error) {
            throw (Error)throwable;
        }
        if (throwable instanceof AsynchronousFailureException) {
            return (AsynchronousFailureException)((Object)throwable);
        }
        return new AsynchronousFailureException("Exception in remote process", throwable);
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static class LocalState {
        private AtomicInteger current = new AtomicInteger(-1);
        private AtomicInteger actual = new AtomicInteger();
        private AtomicInteger expected = new AtomicInteger();
        private AtomicInteger redelivered = new AtomicInteger();
        private StepExecution stepExecution;
        private Queue<ChunkResponse> contributions = new LinkedBlockingQueue<ChunkResponse>();

        private LocalState() {
        }

        public int getExpecting() {
            return this.expected.get() - this.actual.get();
        }

        public <T> ChunkRequest<T> getRequest(List<? extends T> items) {
            return new ChunkRequest<T>(this.current.incrementAndGet(), items, this.getJobId(), this.createStepContribution());
        }

        public void open(int expectedValue, int actualValue) {
            this.actual.set(actualValue);
            this.expected.set(expectedValue);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Collection<ChunkResponse> pollChunkResponses() {
            ArrayList<ChunkResponse> set = new ArrayList<ChunkResponse>();
            Queue<ChunkResponse> queue = this.contributions;
            synchronized (queue) {
                ChunkResponse item = this.contributions.poll();
                while (item != null) {
                    set.add(item);
                    item = this.contributions.poll();
                }
            }
            return set;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void pushResponse(ChunkResponse stepContribution) {
            Queue<ChunkResponse> queue = this.contributions;
            synchronized (queue) {
                this.contributions.add(stepContribution);
            }
        }

        public void incrementRedelivered() {
            this.redelivered.incrementAndGet();
        }

        public void incrementActual() {
            this.actual.incrementAndGet();
        }

        public void incrementExpected() {
            this.expected.incrementAndGet();
        }

        public StepContribution createStepContribution() {
            return this.stepExecution.createStepContribution();
        }

        public Long getJobId() {
            return this.stepExecution.getJobExecution().getJobId();
        }

        public void setStepExecution(StepExecution stepExecution) {
            this.stepExecution = stepExecution;
        }

        public void reset() {
            this.expected.set(0);
            this.actual.set(0);
        }
    }
}

