/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.internal.cache.wan;

import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.GemFireException;
import com.gemstone.gemfire.GemFireIOException;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.client.ServerConnectivityException;
import com.gemstone.gemfire.cache.client.ServerOperationException;
import com.gemstone.gemfire.cache.client.internal.Connection;
import com.gemstone.gemfire.cache.client.internal.InternalPool;
import com.gemstone.gemfire.cache.client.internal.SenderProxy;
import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
import com.gemstone.gemfire.cache.wan.GatewaySender;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
import com.gemstone.gemfire.distributed.internal.ServerLocation;
import com.gemstone.gemfire.i18n.StringId;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import com.gemstone.gemfire.internal.cache.wan.BatchException70;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventDispatcher;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.pdx.PdxRegistryMismatchException;
import com.gemstone.gemfire.security.GemFireSecurityException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;

public class GatewaySenderEventRemoteDispatcher
implements GatewaySenderEventDispatcher {
    private static final Logger logger = LogService.getLogger();
    private final AbstractGatewaySenderEventProcessor processor;
    private volatile Connection connection;
    private final Set<String> notFoundRegions;
    private final Object notFoundRegionsSync;
    private final AbstractGatewaySender sender;
    private AckReaderThread ackReaderThread;
    private ReentrantReadWriteLock connectionLifeCycleLock;
    private int failedConnectCount;

    public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor eventProcessor) {
        block2: {
            this.notFoundRegions = new HashSet<String>();
            this.notFoundRegionsSync = new Object();
            this.connectionLifeCycleLock = new ReentrantReadWriteLock();
            this.failedConnectCount = 0;
            this.processor = eventProcessor;
            this.sender = eventProcessor.getSender();
            try {
                this.initializeConnection();
            }
            catch (GatewaySenderException e) {
                if (!(e.getCause() instanceof GemFireSecurityException)) break block2;
                throw e;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected GatewayAck readAcknowledgement() {
        GatewayAck ack;
        block15: {
            SenderProxy sp = new SenderProxy((InternalPool)this.processor.getSender().getProxy());
            ack = null;
            try {
                this.connection = this.getConnection(false);
                if (logger.isDebugEnabled()) {
                    logger.debug(" Receiving ack on the thread {}", (Object)this.connection);
                }
                this.connectionLifeCycleLock.readLock().lock();
                try {
                    if (this.connection != null) {
                        ack = (GatewayAck)sp.receiveAckFromReceiver(this.connection);
                    }
                }
                finally {
                    this.connectionLifeCycleLock.readLock().unlock();
                }
            }
            catch (Exception e) {
                Object ex;
                Throwable t = e.getCause();
                if (t instanceof BatchException70) {
                    ex = (BatchException70)t;
                } else if (e instanceof GatewaySenderException) {
                    ex = (Exception)e.getCause();
                } else {
                    ex = e;
                    this.destroyConnection();
                }
                if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) break block15;
                if (ex instanceof IOException || ex instanceof ServerConnectivityException && !(((Throwable)ex).getCause() instanceof PdxRegistryMismatchException) || ex instanceof ConnectionDestroyedException) {
                    try {
                        Thread.sleep(100L);
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
                if (!(ex instanceof CancelException)) {
                    logger.fatal((Message)LocalizedMessage.create((StringId)LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH), (Throwable)ex);
                }
                this.processor.setIsStopped(true);
            }
        }
        return ack;
    }

    public boolean dispatchBatch(List events, boolean isRetry) {
        boolean success;
        block10: {
            GatewaySenderStats statistics = this.sender.getStatistics();
            success = false;
            try {
                long start = statistics.startTime();
                success = this._dispatchBatch(events, isRetry);
                if (success) {
                    statistics.endBatch(start, events.size());
                }
            }
            catch (GatewaySenderException ge) {
                Throwable t = ge.getCause();
                if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) break block10;
                if (t instanceof IOException || t instanceof ServerConnectivityException || t instanceof ConnectionDestroyedException || t instanceof MessageTooLargeException || t instanceof IllegalStateException) {
                    this.processor.handleException();
                    try {
                        Thread.sleep(100L);
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug("Because of IOException, failed to dispatch a batch with id : {}", (Object)this.processor.getBatchId());
                    }
                }
                logger.fatal((Message)LocalizedMessage.create((StringId)LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH), (Throwable)ge);
                this.processor.setIsStopped(true);
            }
            catch (CancelException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Stopping the processor because cancellation occurred while processing a batch");
                }
                this.processor.setIsStopped(true);
                throw e;
            }
            catch (Exception e) {
                this.processor.setIsStopped(true);
                logger.fatal((Message)LocalizedMessage.create((StringId)LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH), (Throwable)e);
            }
        }
        return success;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean _dispatchBatch(List events, boolean isRetry) {
        Throwable ex = null;
        int currentBatchId = this.processor.getBatchId();
        this.connection = this.getConnection(true);
        int batchIdForThisConnection = this.processor.getBatchId();
        GatewaySenderStats statistics = this.sender.getStatistics();
        if (currentBatchId != batchIdForThisConnection || this.processor.isConnectionReset()) {
            return false;
        }
        try {
            block16: {
                if (this.processor.isConnectionReset()) {
                    isRetry = true;
                }
                SenderProxy sp = new SenderProxy((InternalPool)this.sender.getProxy());
                this.connectionLifeCycleLock.readLock().lock();
                try {
                    if (this.connection != null) {
                        sp.dispatchBatch_NewWAN(this.connection, events, currentBatchId, isRetry);
                        if (logger.isDebugEnabled()) {
                            logger.debug("{} : Dispatched batch (id={}) of {} events, queue size: {} on connection {}", (Object)this.processor.getSender(), (Object)currentBatchId, (Object)events.size(), (Object)this.processor.getQueue().size(), (Object)this.connection);
                        }
                        break block16;
                    }
                    throw new ConnectionDestroyedException();
                }
                finally {
                    this.connectionLifeCycleLock.readLock().unlock();
                }
            }
            return true;
        }
        catch (ServerOperationException e) {
            Throwable t = e.getCause();
            if (t instanceof BatchException70) {
                ex = (BatchException70)t;
            } else {
                ex = e;
                this.destroyConnection();
            }
            throw new GatewaySenderException(LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(new Object[]{this, currentBatchId, this.connection}), ex);
        }
        catch (GemFireIOException e) {
            Throwable t = e.getCause();
            if (t instanceof MessageTooLargeException) {
                ex = (MessageTooLargeException)t;
                int newBatchSize = Math.min(events.size(), this.processor.getBatchSize()) / 2;
                logger.warn((Message)LocalizedMessage.create((StringId)LocalizedStrings.GatewaySenderEventRemoteDispatcher_MESSAGE_TOO_LARGE_EXCEPTION, (Object[])new Object[]{events.size(), newBatchSize}), (Throwable)e);
                this.processor.setBatchSize(newBatchSize);
                statistics.incBatchesResized();
            } else {
                ex = e;
                this.destroyConnection();
            }
            throw new GatewaySenderException(LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(new Object[]{this, currentBatchId, this.connection}), ex);
        }
        catch (IllegalStateException e) {
            this.processor.setException((GemFireException)new GatewaySenderException((Throwable)e));
            throw new GatewaySenderException(LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(new Object[]{this, currentBatchId, this.connection}), (Throwable)e);
        }
        catch (Exception e) {
            Throwable t = e.getCause();
            ex = t instanceof IOException ? (IOException)t : e;
            this.destroyConnection();
            throw new GatewaySenderException(LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(new Object[]{this, currentBatchId, this.connection}), ex);
        }
    }

    public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException {
        Cache cache;
        if (this.processor.isStopped()) {
            return null;
        }
        if (!this.sender.isParallel()) {
            if (this.connection == null || this.connection.isDestroyed() || !this.connection.getServer().equals((Object)this.sender.getServerLocation())) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Initializing new connection as serverLocation of old connection is : {} and the serverLocation to connect is {}", (Object)(this.connection == null ? "null" : this.connection.getServer()), (Object)this.sender.getServerLocation());
                }
                this.initializeConnection();
            }
        } else if (this.connection == null || this.connection.isDestroyed()) {
            this.initializeConnection();
        }
        if (!((cache = this.sender.getCache()) == null || cache.isClosed() || !this.sender.isPrimary() || this.connection == null || this.ackReaderThread != null && this.ackReaderThread.isRunning())) {
            this.ackReaderThread = new AckReaderThread((GatewaySender)this.sender, this.processor);
            this.ackReaderThread.start();
            this.ackReaderThread.waitForRunningAckReaderThreadRunningState();
        }
        return this.connection;
    }

    public void destroyConnection() {
        this.connectionLifeCycleLock.writeLock().lock();
        try {
            Connection con = this.connection;
            if (con != null) {
                if (!con.isDestroyed()) {
                    con.destroy();
                    this.sender.getProxy().returnConnection(con);
                }
                this.connection = null;
                this.sender.setServerLocation(null);
            }
        }
        finally {
            this.connectionLifeCycleLock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initializeConnection() throws GatewaySenderException, GemFireSecurityException {
        this.connectionLifeCycleLock.writeLock().lock();
        try {
            Object[] logArgs;
            Connection con;
            block28: {
                if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) {
                    this.sender.initProxy();
                } else {
                    this.processor.resetBatchId();
                }
                try {
                    if (this.sender.isParallel()) {
                        con = this.sender.getProxy().acquireConnection();
                        this.sender.setServerLocation(con.getServer());
                        break block28;
                    }
                    Object object = this.sender.getLockForConcurrentDispatcher();
                    synchronized (object) {
                        if (this.sender.getServerLocation() != null) {
                            if (logger.isDebugEnabled()) {
                                logger.debug("ServerLocation is: {}. Connecting to this serverLocation...", (Object)this.sender.getServerLocation());
                            }
                            con = this.sender.getProxy().acquireConnection(this.sender.getServerLocation());
                        } else {
                            if (logger.isDebugEnabled()) {
                                logger.debug("ServerLocation is null. Creating new connection. ");
                            }
                            con = this.sender.getProxy().acquireConnection();
                            if (this.sender.isPrimary()) {
                                if (this.sender.getServerLocation() == null) {
                                    this.sender.setServerLocation(con.getServer());
                                }
                                new UpdateAttributesProcessor((DistributionAdvisee)this.sender).distribute(false);
                            }
                        }
                    }
                }
                catch (ServerConnectivityException e) {
                    ++this.failedConnectCount;
                    Throwable ex = null;
                    if (e.getCause() instanceof GemFireSecurityException) {
                        ex = e.getCause();
                        if (this.logConnectionFailure()) {
                            logger.warn((Message)LocalizedMessage.create((StringId)LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1, (Object[])new Object[]{this.processor.getSender().getId(), ex.getMessage()}));
                        }
                        throw new GatewaySenderException(ex);
                    }
                    List servers = this.sender.getProxy().getCurrentServers();
                    String ioMsg = null;
                    if (servers.size() == 0) {
                        ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_THERE_ARE_NO_ACTIVE_SERVERS.toLocalizedString();
                    } else {
                        StringBuilder buffer = new StringBuilder();
                        for (ServerLocation server : servers) {
                            String endpointName = String.valueOf(server);
                            if (buffer.length() > 0) {
                                buffer.append(", ");
                            }
                            buffer.append(endpointName);
                        }
                        ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_NO_AVAILABLE_CONNECTION_WAS_FOUND_BUT_THE_FOLLOWING_ACTIVE_SERVERS_EXIST_0.toLocalizedString(new Object[]{buffer.toString()});
                    }
                    ex = new IOException(ioMsg);
                    this.sender.setServerLocation(null);
                    if (this.failedConnectCount == 1) {
                        logger.warn((Message)LocalizedMessage.create((StringId)LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT, (Object)this.processor.getSender().getId()));
                    }
                    throw new GatewaySenderException(LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT.toLocalizedString(new Object[]{this.processor.getSender().getId()}), ex);
                }
            }
            if (this.failedConnectCount > 0) {
                logArgs = new Object[]{this.processor.getSender().getId(), con, this.failedConnectCount};
                logger.info((Message)LocalizedMessage.create((StringId)LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1_AFTER_2_FAILED_CONNECT_ATTEMPTS, (Object[])logArgs));
                this.failedConnectCount = 0;
            } else {
                logArgs = new Object[]{this.processor.getSender().getId(), con};
                logger.info((Message)LocalizedMessage.create((StringId)LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1, (Object[])logArgs));
            }
            this.connection = con;
            this.processor.checkIfPdxNeedsResend(this.connection.getQueueStatus().getPdxSize());
        }
        catch (ConnectionDestroyedException e) {
            throw new GatewaySenderException(LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT.toLocalizedString(new Object[]{this.processor.getSender().getId()}), (Throwable)e);
        }
        finally {
            this.connectionLifeCycleLock.writeLock().unlock();
        }
    }

    protected boolean logConnectionFailure() {
        if (logger.isDebugEnabled() || this.failedConnectCount == 0) {
            return true;
        }
        if (this.failedConnectCount >= 3000) {
            return this.failedConnectCount % 3000 == 0;
        }
        return this.failedConnectCount == 30 || this.failedConnectCount == 300;
    }

    public void stopAckReaderThread() {
        if (this.ackReaderThread != null) {
            this.ackReaderThread.shutdown();
        }
    }

    public boolean isRemoteDispatcher() {
        return true;
    }

    public boolean isConnectedToRemote() {
        return this.connection != null;
    }

    public void stop() {
        this.stopAckReaderThread();
        if (this.processor.isStopped()) {
            this.destroyConnection();
        }
    }

    class AckReaderThread
    extends Thread {
        private Object runningStateLock;
        private volatile boolean shutdown;
        private final GemFireCacheImpl cache;
        private volatile boolean ackReaderThreadRunning;

        public AckReaderThread(GatewaySender sender, AbstractGatewaySenderEventProcessor processor) {
            super("AckReaderThread for : " + processor.getName());
            this.runningStateLock = new Object();
            this.shutdown = false;
            this.ackReaderThreadRunning = false;
            this.setDaemon(true);
            this.cache = (GemFireCacheImpl)((AbstractGatewaySender)sender).getCache();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void waitForRunningAckReaderThreadRunningState() {
            Object object = this.runningStateLock;
            synchronized (object) {
                while (!this.ackReaderThreadRunning) {
                    try {
                        this.runningStateLock.wait();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }

        private boolean checkCancelled() {
            if (this.shutdown) {
                return true;
            }
            return this.cache.getCancelCriterion().isCancelInProgress();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block23: {
                if (logger.isDebugEnabled()) {
                    logger.debug("AckReaderThread started.. ");
                }
                Object object = this.runningStateLock;
                synchronized (object) {
                    this.ackReaderThreadRunning = true;
                    this.runningStateLock.notifyAll();
                }
                block13: while (true) {
                    try {
                        while (true) {
                            if (this.checkCancelled()) {
                                break block23;
                            }
                            GatewayAck ack = GatewaySenderEventRemoteDispatcher.this.readAcknowledgement();
                            if (ack != null) {
                                boolean gotBatchException = ack.getBatchException() != null;
                                int batchId = ack.getBatchId();
                                int numEvents = ack.getNumEvents();
                                if (gotBatchException) {
                                    logger.info((Message)LocalizedMessage.create((StringId)LocalizedStrings.GatewaySenderEventRemoteDispatcher_GATEWAY_SENDER_0_RECEIVED_ACK_FOR_BATCH_ID_1_WITH_EXCEPTION, (Object[])new Object[]{GatewaySenderEventRemoteDispatcher.this.processor.getSender(), ack.getBatchId()}, (Throwable)ack.getBatchException()));
                                    GatewaySenderStats statistics = GatewaySenderEventRemoteDispatcher.this.sender.getStatistics();
                                    statistics.incBatchesRedistributed();
                                    this.logBatchExceptions(ack.getBatchException());
                                    GatewaySenderEventRemoteDispatcher.this.processor.handleSuccessBatchAck(batchId);
                                    continue;
                                }
                                if (logger.isDebugEnabled()) {
                                    logger.debug("Gateway Sender {} : Received ack for batch id {} of {} events", (Object)GatewaySenderEventRemoteDispatcher.this.processor.getSender(), (Object)ack.getBatchId(), (Object)ack.getNumEvents());
                                }
                                GatewaySenderEventRemoteDispatcher.this.processor.handleSuccessBatchAck(batchId);
                                continue;
                            }
                            if (logger.isDebugEnabled()) {
                                logger.debug("{}: Received null ack from remote site.", (Object)GatewaySenderEventRemoteDispatcher.this.processor.getSender());
                            }
                            GatewaySenderEventRemoteDispatcher.this.processor.handleException();
                            try {
                                Thread.sleep(GatewaySender.CONNECTION_RETRY_INTERVAL);
                                continue block13;
                            }
                            catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                continue;
                            }
                            break;
                        }
                    }
                    catch (Exception e) {
                        if (!this.checkCancelled()) {
                            logger.fatal((Message)LocalizedMessage.create((StringId)LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH), (Throwable)e);
                        }
                        GatewaySenderEventRemoteDispatcher.this.sender.getLifeCycleLock().writeLock().lock();
                        try {
                            GatewaySenderEventRemoteDispatcher.this.processor.stopProcessing();
                            GatewaySenderEventRemoteDispatcher.this.sender.clearTempEventsAfterSenderStopped();
                            break block23;
                        }
                        finally {
                            GatewaySenderEventRemoteDispatcher.this.sender.getLifeCycleLock().writeLock().unlock();
                        }
                    }
                }
                finally {
                    if (logger.isDebugEnabled()) {
                        logger.debug("AckReaderThread exiting. ");
                    }
                    this.ackReaderThreadRunning = false;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void logBatchExceptions(BatchException70 exception) {
            for (BatchException70 be : exception.getExceptions()) {
                List[] eventsArr;
                boolean logWarning = true;
                if (be.getCause() instanceof RegionDestroyedException) {
                    RegionDestroyedException rde = (RegionDestroyedException)be.getCause();
                    Iterator iterator = GatewaySenderEventRemoteDispatcher.this.notFoundRegionsSync;
                    synchronized (iterator) {
                        if (GatewaySenderEventRemoteDispatcher.this.notFoundRegions.contains(rde.getRegionFullPath())) {
                            logWarning = false;
                        } else {
                            GatewaySenderEventRemoteDispatcher.this.notFoundRegions.add(rde.getRegionFullPath());
                        }
                    }
                } else if (be.getCause() instanceof IllegalStateException && be.getCause().getMessage().contains("Unknown pdx type")) {
                    List pdxEvents = (List)GatewaySenderEventRemoteDispatcher.this.processor.getBatchIdToPDXEventsMap().get(be.getBatchId());
                    if (logWarning) {
                        logger.warn((Message)LocalizedMessage.create((StringId)LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_PDX_EVENT__0, (Object)be.getIndex()), (Throwable)be);
                    }
                    if (pdxEvents == null) continue;
                    for (GatewaySenderEventImpl senderEvent : pdxEvents) {
                        senderEvent.isAcked = false;
                    }
                    GatewaySenderEventImpl gsEvent = (GatewaySenderEventImpl)pdxEvents.get(be.getIndex());
                    if (!logWarning) continue;
                    logger.warn((Message)LocalizedMessage.create((StringId)LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0, (Object)gsEvent));
                    continue;
                }
                if (logWarning) {
                    logger.warn((Message)LocalizedMessage.create((StringId)LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_EVENT__0, (Object)be.getIndex()), (Throwable)be);
                }
                if ((eventsArr = (List[])GatewaySenderEventRemoteDispatcher.this.processor.getBatchIdToEventsMap().get(be.getBatchId())) == null) continue;
                List filteredEvents = eventsArr[1];
                GatewaySenderEventImpl gsEvent = (GatewaySenderEventImpl)filteredEvents.get(be.getIndex());
                if (!logWarning) continue;
                logger.warn((Message)LocalizedMessage.create((StringId)LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0, (Object)gsEvent));
            }
        }

        boolean isRunning() {
            return this.ackReaderThreadRunning;
        }

        public void shutdown() {
            if (GatewaySenderEventRemoteDispatcher.this.connection != null) {
                Connection conn = GatewaySenderEventRemoteDispatcher.this.connection;
                this.shutDownAckReaderConnection();
                if (!conn.isDestroyed()) {
                    conn.destroy();
                    GatewaySenderEventRemoteDispatcher.this.sender.getProxy().returnConnection(conn);
                }
            }
            this.shutdown = true;
            boolean interrupted = Thread.interrupted();
            try {
                this.join(15000L);
            }
            catch (InterruptedException e) {
                interrupted = true;
            }
            finally {
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
            if (this.isAlive()) {
                logger.warn((Message)LocalizedMessage.create((StringId)LocalizedStrings.GatewaySender_ACKREADERTHREAD_IGNORED_CANCELLATION));
            }
        }

        private void shutDownAckReaderConnection() {
            Connection conn = GatewaySenderEventRemoteDispatcher.this.connection;
            try {
                if (conn != null && conn.getInputStream() != null) {
                    conn.getInputStream().close();
                }
            }
            catch (IOException e) {
                logger.warn("Unable to shutdown AckReaderThread Connection");
            }
            catch (ConnectionDestroyedException e) {
                logger.info("AckReader shutting down and connection already destroyed");
            }
        }
    }

    public static class GatewayAck {
        private int batchId;
        private int numEvents;
        private BatchException70 be;

        public GatewayAck(BatchException70 be, int bId) {
            this.be = be;
            this.batchId = bId;
        }

        public GatewayAck(int batchId, int numEvents) {
            this.batchId = batchId;
            this.numEvents = numEvents;
        }

        public int getNumEvents() {
            return this.numEvents;
        }

        public int getBatchId() {
            return this.batchId;
        }

        public BatchException70 getBatchException() {
            return this.be;
        }
    }
}

