/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.cache.query.internal.cq;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.ProxyCache;
import org.apache.geode.cache.client.internal.ServerCQProxyImpl;
import org.apache.geode.cache.client.internal.ServerProxy;
import org.apache.geode.cache.client.internal.ServerRegionProxy;
import org.apache.geode.cache.client.internal.UserAttributes;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesMutator;
import org.apache.geode.cache.query.CqClosedException;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqResults;
import org.apache.geode.cache.query.CqStatusListener;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.internal.CqStateImpl;
import org.apache.geode.cache.query.internal.cq.ClientCQ;
import org.apache.geode.cache.query.internal.cq.CqEventImpl;
import org.apache.geode.cache.query.internal.cq.CqQueryImpl;
import org.apache.geode.cache.query.internal.cq.CqServiceImpl;
import org.apache.geode.i18n.StringId;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.security.GemFireSecurityException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;

public class ClientCQImpl
extends CqQueryImpl
implements ClientCQ {
    private static final Logger logger = LogService.getLogger();
    private CqAttributes cqAttributes = null;
    private volatile ServerCQProxyImpl cqProxy;
    private ProxyCache proxyCache = null;
    private volatile ConcurrentLinkedQueue<CqEventImpl> queuedEvents = null;
    final Object queuedEventsSynchObject = new Object();
    private boolean connected = false;

    public ClientCQImpl(CqServiceImpl cqService, String cqName, String queryString, CqAttributes cqAttributes, ServerCQProxyImpl serverProxy, boolean isDurable) {
        super(cqService, cqName, queryString, isDurable);
        this.cqAttributes = cqAttributes;
        this.cqProxy = serverProxy;
    }

    @Override
    public String getServerCqName() {
        return this.cqName;
    }

    ServerCQProxyImpl getCQProxy() {
        return this.cqProxy;
    }

    private void initConnectionProxy() throws CqException, RegionNotFoundException {
        this.cqBaseRegion = (LocalRegion)this.cqService.getCache().getRegion(this.regionName);
        if (this.cqBaseRegion == null) {
            throw new RegionNotFoundException(LocalizedStrings.CqQueryImpl_REGION_ON_WHICH_QUERY_IS_SPECIFIED_NOT_FOUND_LOCALLY_REGIONNAME_0.toLocalizedString(new Object[]{this.regionName}));
        }
        ServerRegionProxy srp = this.cqBaseRegion.getServerProxy();
        if (srp != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("Found server region proxy on region. RegionName: {}", (Object)this.regionName);
            }
            this.cqProxy = new ServerCQProxyImpl((ServerProxy)srp);
            if (!srp.getPool().getSubscriptionEnabled()) {
                throw new CqException("The 'queueEnabled' flag on Pool installed on Region " + this.regionName + " is set to false.");
            }
        } else {
            throw new CqException("Unable to get the connection pool. The Region does not have a pool configured.");
        }
    }

    public void close() throws CqClosedException, CqException {
        this.close(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close(boolean sendRequestToServer) throws CqClosedException, CqException {
        CqListener[] cqListeners;
        boolean isDebugEnabled = logger.isDebugEnabled();
        if (isDebugEnabled) {
            logger.debug("Started closing CQ CqName: {} SendRequestToServer: {}", (Object)this.cqName, (Object)sendRequestToServer);
        }
        CqStateImpl cqStateImpl = this.cqState;
        synchronized (cqStateImpl) {
            if (this.isClosed()) {
                if (isDebugEnabled) {
                    logger.debug("CQ is already closed, CqName: {}", (Object)this.cqName);
                }
                return;
            }
            int stateBeforeClosing = this.cqState.getState();
            this.cqState.setState(3);
            boolean isClosed = false;
            Exception exception = null;
            if (this.cqProxy != null && sendRequestToServer) {
                try {
                    if (this.proxyCache != null) {
                        if (this.proxyCache.isClosed()) {
                            throw this.proxyCache.getCacheClosedException("Cache is closed for this user.");
                        }
                        UserAttributes.userAttributes.set(this.proxyCache.getUserAttributes());
                    }
                    this.cqProxy.close(this);
                    isClosed = true;
                }
                catch (CancelException e) {
                    throw e;
                }
                catch (Exception ex) {
                    if (this.shutdownInProgress()) {
                        return;
                    }
                    exception = ex;
                }
                finally {
                    UserAttributes.userAttributes.set(null);
                }
            }
            this.removeFromCqMap();
            if (this.cqProxy == null || !sendRequestToServer || isClosed) {
                if (stateBeforeClosing == 1) {
                    this.cqService.stats().decCqsActive();
                } else if (stateBeforeClosing == 0) {
                    this.cqService.stats().decCqsStopped();
                }
                this.cqState.setState(2);
                this.cqService.stats().incCqsClosed();
                this.cqService.stats().decCqsOnClient();
                if (this.stats != null) {
                    this.stats.close();
                }
            } else {
                if (this.shutdownInProgress()) {
                    return;
                }
                if (exception != null) {
                    throw new CqException(LocalizedStrings.CqQueryImpl_FAILED_TO_CLOSE_THE_CQ_CQNAME_0_ERROR_FROM_LAST_ENDPOINT_1.toLocalizedString(new Object[]{this.cqName, exception.getLocalizedMessage()}), exception.getCause());
                }
                throw new CqException(LocalizedStrings.CqQueryImpl_FAILED_TO_CLOSE_THE_CQ_CQNAME_0_THE_SERVER_ENDPOINTS_ON_WHICH_THIS_CQ_WAS_REGISTERED_WERE_NOT_FOUND.toLocalizedString(new Object[]{this.cqName}));
            }
        }
        if (this.cqAttributes != null && (cqListeners = this.getCqAttributes().getCqListeners()) != null) {
            if (isDebugEnabled) {
                logger.debug("Invoking CqListeners close() api for the CQ, CqName: {} Number of CqListeners: {}", (Object)this.cqName, (Object)cqListeners.length);
            }
            for (int lCnt = 0; lCnt < cqListeners.length; ++lCnt) {
                try {
                    cqListeners[lCnt].close();
                    continue;
                }
                catch (Exception ex) {
                    logger.warn((Message)LocalizedMessage.create((StringId)LocalizedStrings.CqQueryImpl_EXCEPTION_OCCOURED_IN_THE_CQLISTENER_OF_THE_CQ_CQNAME_0_ERROR_1, (Object[])new Object[]{this.cqName, ex.getLocalizedMessage()}));
                    if (!isDebugEnabled) continue;
                    logger.debug(ex.getMessage(), (Throwable)ex);
                    continue;
                }
                catch (VirtualMachineError err) {
                    SystemFailure.initiateFailure((Error)err);
                    throw err;
                }
                catch (Throwable t) {
                    SystemFailure.checkFailure();
                    logger.warn((Message)LocalizedMessage.create((StringId)LocalizedStrings.CqQueryImpl_RUNTIMEEXCEPTION_OCCOURED_IN_THE_CQLISTENER_OF_THE_CQ_CQNAME_0_ERROR_1, (Object[])new Object[]{this.cqName, t.getLocalizedMessage()}));
                    if (!isDebugEnabled) continue;
                    logger.debug(t.getMessage(), t);
                }
            }
        }
        if (isDebugEnabled) {
            logger.debug("Successfully closed the CQ. {}", (Object)this.cqName);
        }
    }

    @Override
    protected void cleanup() throws CqException {
        this.cqService.removeFromBaseRegionToCqNameMap(this.regionName, this.getServerCqName());
    }

    public CqAttributes getCqAttributes() {
        return this.cqAttributes;
    }

    public CqListener[] getCqListeners() {
        return this.cqAttributes.getCqListeners();
    }

    public void execute() throws CqClosedException, RegionNotFoundException, CqException {
        this.executeCqOnRedundantsAndPrimary(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <E> CqResults<E> executeWithInitialResults() throws CqClosedException, RegionNotFoundException, CqException {
        CqResults initialResults;
        Object object = this.queuedEventsSynchObject;
        synchronized (object) {
            while (this.queuedEvents != null) {
                try {
                    this.queuedEventsSynchObject.wait();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            this.queuedEvents = new ConcurrentLinkedQueue();
        }
        if (CqQueryImpl.testHook != null) {
            testHook.pauseUntilReady();
        }
        try {
            initialResults = (CqResults)this.executeCqOnRedundantsAndPrimary(true);
        }
        catch (RuntimeException | CqException | RegionNotFoundException e) {
            this.queuedEvents = null;
            throw e;
        }
        Object object2 = this.queuedEventsSynchObject;
        synchronized (object2) {
            try {
                if (!this.queuedEvents.isEmpty()) {
                    try {
                        Runnable r = new Runnable(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            public void run() {
                                Object[] eventArray = null;
                                if (CqQueryImpl.testHook != null) {
                                    CqQueryImpl.testHook.setEventCount(ClientCQImpl.this.queuedEvents.size());
                                }
                                Object object = ClientCQImpl.this.queuedEventsSynchObject;
                                synchronized (object) {
                                    try {
                                        for (Object cqEvent : eventArray = ClientCQImpl.this.queuedEvents.toArray()) {
                                            ClientCQImpl.this.cqService.invokeListeners(ClientCQImpl.this.cqName, ClientCQImpl.this, (CqEventImpl)cqEvent);
                                            ClientCQImpl.this.stats.decQueuedCqListenerEvents();
                                        }
                                    }
                                    finally {
                                        ClientCQImpl.this.queuedEvents.clear();
                                        ClientCQImpl.this.queuedEvents = null;
                                        ClientCQImpl.this.queuedEventsSynchObject.notify();
                                    }
                                }
                            }
                        };
                        LoggingThreadGroup group = LoggingThreadGroup.createThreadGroup((String)"CQEventHandler", (Logger)logger);
                        Thread thread = new Thread((ThreadGroup)group, r, "CQEventHandler For " + this.cqName);
                        thread.setDaemon(true);
                        thread.start();
                    }
                    catch (Exception ex) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Exception while invoking the CQ Listener with queued events.", (Throwable)ex);
                        }
                    }
                } else {
                    this.queuedEvents = null;
                }
            }
            finally {
                this.queuedEventsSynchObject.notify();
            }
            return initialResults;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Object executeCqOnRedundantsAndPrimary(boolean executeWithInitialResults) throws CqClosedException, RegionNotFoundException, CqException {
        SelectResults initialResults = null;
        CqStateImpl cqStateImpl = this.cqState;
        synchronized (cqStateImpl) {
            if (this.isClosed()) {
                throw new CqClosedException(LocalizedStrings.CqQueryImpl_CQ_IS_CLOSED_CQNAME_0.toLocalizedString(new Object[]{this.cqName}));
            }
            if (this.isRunning()) {
                throw new IllegalStateException(LocalizedStrings.CqQueryImpl_CQ_IS_IN_RUNNING_STATE_CQNAME_0.toLocalizedString(new Object[]{this.cqName}));
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Performing Execute {} request for CQ. CqName: {}", (Object)(executeWithInitialResults ? "WithInitialResult" : ""), (Object)this.cqName);
            }
            this.cqBaseRegion = (LocalRegion)this.cqService.getCache().getRegion(this.regionName);
            if (!this.cqService.isServer()) {
                if (this.cqProxy == null) {
                    this.initConnectionProxy();
                }
                boolean success = false;
                try {
                    if (this.proxyCache != null) {
                        if (this.proxyCache.isClosed()) {
                            throw this.proxyCache.getCacheClosedException("Cache is closed for this user.");
                        }
                        UserAttributes.userAttributes.set(this.proxyCache.getUserAttributes());
                    }
                    if (executeWithInitialResults) {
                        initialResults = this.cqProxy.createWithIR(this);
                        if (initialResults == null) {
                            String errMsg = "Failed to execute the CQ.  CqName: " + this.cqName + ", Query String is: " + this.queryString;
                            throw new CqException(errMsg);
                        }
                    } else {
                        this.cqProxy.create(this);
                    }
                    success = true;
                }
                catch (Exception ex) {
                    if (this.shutdownInProgress()) {
                        throw new CqException("System shutdown in progress.");
                    }
                    if (ex.getCause() instanceof GemFireSecurityException) {
                        if (this.securityLogWriter.warningEnabled()) {
                            this.securityLogWriter.warning(LocalizedStrings.CqQueryImpl_EXCEPTION_WHILE_EXECUTING_CQ_EXCEPTION_0, (Object)ex, null);
                        }
                        throw new CqException(ex.getCause().getMessage(), ex.getCause());
                    }
                    if (ex instanceof CqException) {
                        throw (CqException)((Object)ex);
                    }
                    String errMsg = LocalizedStrings.CqQueryImpl_FAILED_TO_EXECUTE_THE_CQ_CQNAME_0_QUERY_STRING_IS_1_ERROR_FROM_LAST_SERVER_2.toLocalizedString(new Object[]{this.cqName, this.queryString, ex.getLocalizedMessage()});
                    if (logger.isDebugEnabled()) {
                        logger.debug(errMsg, (Throwable)ex);
                    }
                    throw new CqException(errMsg, (Throwable)ex);
                }
                finally {
                    if (!success && !this.shutdownInProgress()) {
                        try {
                            this.cqProxy.close(this);
                        }
                        catch (Exception e) {
                            if (logger.isDebugEnabled()) {
                                logger.debug("Exception cleaning up failed cq", (Throwable)e);
                            }
                            UserAttributes.userAttributes.set(null);
                        }
                    }
                    UserAttributes.userAttributes.set(null);
                }
            }
            this.cqState.setState(1);
        }
        if (!this.cqService.isServer()) {
            this.connected = true;
            CqListener[] cqListeners = this.getCqAttributes().getCqListeners();
            for (int lCnt = 0; lCnt < cqListeners.length; ++lCnt) {
                if (cqListeners[lCnt] == null || !(cqListeners[lCnt] instanceof CqStatusListener)) continue;
                CqStatusListener listener = (CqStatusListener)cqListeners[lCnt];
                listener.onCqConnected();
            }
        }
        this.cqService.stats().incCqsActive();
        this.cqService.stats().decCqsStopped();
        return initialResults;
    }

    private boolean shutdownInProgress() {
        InternalCache cache = this.cqService.getInternalCache();
        if (cache == null || cache.isClosed()) {
            return true;
        }
        String reason = this.cqProxy.getPool().getCancelCriterion().cancelInProgress();
        return reason != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() throws CqClosedException, CqException {
        boolean isStopped = false;
        CqStateImpl cqStateImpl = this.cqState;
        synchronized (cqStateImpl) {
            if (this.isClosed()) {
                throw new CqClosedException(LocalizedStrings.CqQueryImpl_CQ_IS_CLOSED_CQNAME_0.toLocalizedString(new Object[]{this.cqName}));
            }
            if (!this.isRunning()) {
                throw new IllegalStateException(LocalizedStrings.CqQueryImpl_CQ_IS_NOT_IN_RUNNING_STATE_STOP_CQ_DOES_NOT_APPLY_CQNAME_0.toLocalizedString(new Object[]{this.cqName}));
            }
            Exception exception = null;
            try {
                if (this.proxyCache != null) {
                    if (this.proxyCache.isClosed()) {
                        throw this.proxyCache.getCacheClosedException("Cache is closed for this user.");
                    }
                    UserAttributes.userAttributes.set(this.proxyCache.getUserAttributes());
                }
                this.cqProxy.stop(this);
                isStopped = true;
            }
            catch (Exception e) {
                exception = e;
            }
            finally {
                UserAttributes.userAttributes.set(null);
            }
            if (this.cqProxy == null || isStopped) {
                this.cqState.setState(0);
                this.cqService.stats().incCqsStopped();
                this.cqService.stats().decCqsActive();
                if (logger.isDebugEnabled()) {
                    logger.debug("Successfully stopped the CQ. {}", (Object)this.cqName);
                }
            } else {
                if (exception != null) {
                    throw new CqException(LocalizedStrings.CqQueryImpl_FAILED_TO_STOP_THE_CQ_CQNAME_0_ERROR_FROM_LAST_SERVER_1.toLocalizedString(new Object[]{this.cqName, exception.getLocalizedMessage()}), exception.getCause());
                }
                throw new CqException(LocalizedStrings.CqQueryImpl_FAILED_TO_STOP_THE_CQ_CQNAME_0_THE_SERVER_ENDPOINTS_ON_WHICH_THIS_CQ_WAS_REGISTERED_WERE_NOT_FOUND.toLocalizedString(new Object[]{this.cqName}));
            }
        }
    }

    public CqAttributesMutator getCqAttributesMutator() {
        return (CqAttributesMutator)this.cqAttributes;
    }

    ConcurrentLinkedQueue<CqEventImpl> getQueuedEvents() {
        return this.queuedEvents;
    }

    public void setProxyCache(ProxyCache proxyCache) {
        this.proxyCache = proxyCache;
    }

    boolean isConnected() {
        return this.connected;
    }

    void setConnected(boolean connected) {
        this.connected = connected;
    }

    public void createOn(Connection conn, boolean isDurable) {
        byte regionDataPolicyOrdinal = this.getCqBaseRegion() == null ? (byte)0 : this.getCqBaseRegion().getAttributes().getDataPolicy().ordinal;
        int state = this.cqState.getState();
        this.cqProxy.createOn(this.getName(), conn, this.getQueryString(), state, isDurable, regionDataPolicyOrdinal);
    }
}

