package com.gemstone.gemfire.internal.cache.wan.serial;

import com.gemstone.gemfire.GemFireException;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
import com.gemstone.gemfire.cache.util.Gateway;
import com.gemstone.gemfire.internal.LogWriterImpl;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper;
import com.gemstone.gemfire.internal.cache.RegionQueue;
import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

/* loaded from: input_file:com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.class */
public class ConcurrentSerialGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor {
    private final List<SerialGatewaySenderEventProcessor> processors;
    private final SerialGatewaySenderImpl sender;
    private GemFireException ex;
    private final Set<RegionQueue> queues;

    public ConcurrentSerialGatewaySenderEventProcessor(SerialGatewaySenderImpl serialGatewaySenderImpl) {
        super(LogWriterImpl.createThreadGroup("Event Processor for GatewaySender_" + serialGatewaySenderImpl.getId(), serialGatewaySenderImpl.getLogger()), "Event Processor for GatewaySender_" + serialGatewaySenderImpl.getId(), serialGatewaySenderImpl);
        this.processors = new ArrayList();
        this.ex = null;
        this.sender = serialGatewaySenderImpl;
        initializeMessageQueue(serialGatewaySenderImpl.getId());
        this.queues = new HashSet();
        Iterator<SerialGatewaySenderEventProcessor> it = this.processors.iterator();
        while (it.hasNext()) {
            this.queues.add(it.next().getQueue());
        }
        setDaemon(true);
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    protected void initializeMessageQueue(String str) {
        for (int i = 0; i < this.sender.getDispatcherThreads(); i++) {
            this.processors.add(new SerialGatewaySenderEventProcessor(this.sender, str + "." + i));
            if (this.sender.getLogger().fineEnabled()) {
                this.sender.getLogger().fine("Created the SerialGatewayEventProcessor_" + i + ".... " + this.processors.get(i));
            }
        }
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void enqueueEvent(EnumListenerEvent enumListenerEvent, EntryEvent entryEvent) throws IOException, CacheException {
        if (!((EntryEventImpl) entryEvent).isOnPdxTypeRegion()) {
            enqueueEvent(enumListenerEvent, entryEvent, Math.abs(getHashCode((EntryEventImpl) entryEvent) % this.processors.size()));
            return;
        }
        for (int i = 0; i < this.processors.size(); i++) {
            enqueueEvent(enumListenerEvent, entryEvent, i);
        }
    }

    public void enqueueEvent(EnumListenerEvent enumListenerEvent, EntryEvent entryEvent, int i) throws CacheException, IOException {
        SerialGatewaySenderEventProcessor serialGatewaySenderEventProcessor = this.processors.get(i);
        EntryEventImpl entryEventImpl = (EntryEventImpl) entryEvent;
        if (this.sender.getOrderPolicy() == Gateway.OrderPolicy.KEY || this.sender.getOrderPolicy() == Gateway.OrderPolicy.PARTITION) {
            entryEventImpl = new EntryEventImpl((EntryEventImpl) entryEvent);
            EventID eventId = entryEventImpl.getEventId();
            if (getLogger().fineEnabled()) {
                getLogger().fine("The original EventId is " + eventId);
            }
            long createFakeThreadIDForParallelGateway = ThreadIdentifier.createFakeThreadIDForParallelGateway(i, eventId.getThreadID(), 0);
            EventID eventID = new EventID(eventId.getMembershipID(), createFakeThreadIDForParallelGateway, eventId.getSequenceID());
            if (getLogger().fineEnabled()) {
                getLogger().fine(this + ": Generated event id for event with key=" + entryEvent.getKey() + ", index=" + i + ", original event id=" + eventId + ", threadId=" + eventId.getThreadID() + ", new event id=" + eventID + ", newThreadId=" + createFakeThreadIDForParallelGateway);
            }
            entryEventImpl.setEventId(eventID);
        }
        serialGatewaySenderEventProcessor.enqueueEvent(enumListenerEvent, entryEventImpl);
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor, java.lang.Thread, java.lang.Runnable
    public void run() {
        for (int i = 0; i < this.processors.size(); i++) {
            if (this.sender.getLogger().fineEnabled()) {
                this.sender.getLogger().fine("Starting the serialProcessor " + i);
            }
            this.processors.get(i).start();
        }
        try {
            waitForRunningStatus();
        } catch (GatewaySenderException e) {
            this.ex = e;
        }
        synchronized (this.runningStateLock) {
            if (this.ex != null) {
                setGemFireException(this.ex);
                setIsStopped(true);
            } else {
                setIsStopped(false);
            }
            this.runningStateLock.notifyAll();
        }
        Iterator<SerialGatewaySenderEventProcessor> it = this.processors.iterator();
        while (it.hasNext()) {
            try {
                it.next().join();
            } catch (InterruptedException e2) {
                if (this.sender.getLogger().fineEnabled()) {
                    this.sender.getLogger().fine("Got InterruptedException while waiting for child threads to finish.");
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private void waitForRunningStatus() {
        for (SerialGatewaySenderEventProcessor serialGatewaySenderEventProcessor : this.processors) {
            synchronized (serialGatewaySenderEventProcessor.runningStateLock) {
                while (serialGatewaySenderEventProcessor.getGemFireException() == null && serialGatewaySenderEventProcessor.isStopped()) {
                    try {
                        serialGatewaySenderEventProcessor.runningStateLock.wait();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                GemFireException gemFireException = serialGatewaySenderEventProcessor.getGemFireException();
                if (gemFireException != null) {
                    throw new GatewaySenderException(LocalizedStrings.Sender_COULD_NOT_START_GATEWAYSENDER_0_BECAUSE_OF_EXCEPTION_1.toLocalizedString(Long.valueOf(getId()), gemFireException.getMessage()), gemFireException.getCause());
                }
            }
        }
    }

    private int getHashCode(EntryEventImpl entryEventImpl) {
        int i = 0;
        switch (this.sender.getOrderPolicy()) {
            case KEY:
                i = entryEventImpl.getKey().hashCode();
                break;
            case THREAD:
                EventID eventId = entryEventImpl.getEventId();
                byte[] membershipID = eventId.getMembershipID();
                long threadID = eventId.getThreadID();
                i = Arrays.hashCode(membershipID) + ((int) (threadID ^ (threadID >>> 32)));
                if (getLogger().fineEnabled()) {
                    getLogger().fine(this + ": Generated hashcode for event with key=" + entryEventImpl.getKey() + ", memberId=" + membershipID + ", threadId=" + threadID + ": " + i);
                    break;
                }
                break;
            case PARTITION:
                i = PartitionRegionHelper.isPartitionedRegion(entryEventImpl.getRegion()) ? PartitionedRegionHelper.getHashKey(entryEventImpl) : entryEventImpl.getKey().hashCode();
                if (getLogger().fineEnabled()) {
                    getLogger().fine(this + ": Generated partition hashcode for event with key=" + entryEventImpl.getKey() + ": " + i);
                    break;
                }
                break;
        }
        return i;
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void stopProcessing() {
        if (isAlive()) {
            Iterator<SerialGatewaySenderEventProcessor> it = this.processors.iterator();
            while (it.hasNext()) {
                it.next().stopProcessing();
            }
            setIsStopped(true);
            if (getLogger().fineEnabled()) {
                getLogger().fine("ConcurrentSerialGatewaySenderEventProcessor: Stopped dispatching: " + this);
            }
        }
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void pauseDispatching() {
        Iterator<SerialGatewaySenderEventProcessor> it = this.processors.iterator();
        while (it.hasNext()) {
            it.next().pauseDispatching();
        }
        super.pauseDispatching();
        if (getLogger().fineEnabled()) {
            getLogger().fine("ConcurrentSerialGatewaySenderEventProcessor: Paused dispatching: " + this);
        }
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void resumeDispatching() {
        Iterator<SerialGatewaySenderEventProcessor> it = this.processors.iterator();
        while (it.hasNext()) {
            it.next().resumeDispatching();
        }
        super.resumeDispatching();
        if (getLogger().fineEnabled()) {
            getLogger().fine("ConcurrentSerialGatewaySenderEventProcessor: Resumed dispatching: " + this);
        }
    }

    public Set<RegionQueue> getQueues() {
        return this.queues;
    }

    @Override // com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor
    public void removeCacheListener() {
        Iterator<SerialGatewaySenderEventProcessor> it = this.processors.iterator();
        while (it.hasNext()) {
            it.next().removeCacheListener();
        }
    }
}
