package com.gemstone.gemfire.internal.cache.partitioned;

import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.DistributionStats;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.distributed.internal.ReplyMessage;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.i18n.LogWriterI18n;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.HeapDataOutputStream;
import com.gemstone.gemfire.internal.cache.BucketDump;
import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.CachedDeserializable;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.InitialImageOperation;
import com.gemstone.gemfire.internal.cache.KeyWithRegionContext;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
import com.gemstone.gemfire.internal.cache.RegionEntry;
import com.gemstone.gemfire.internal.cache.Token;
import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gnu.trove.TObjectIntProcedure;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

/* loaded from: input_file:com/gemstone/gemfire/internal/cache/partitioned/FetchEntriesMessage.class */
public final class FetchEntriesMessage extends PartitionMessage {
    private int bucketId;

    /* loaded from: input_file:com/gemstone/gemfire/internal/cache/partitioned/FetchEntriesMessage$FetchEntriesReplyMessage.class */
    public static final class FetchEntriesReplyMessage extends ReplyMessage {
        int bucketId;
        int seriesNum;
        int msgNum;
        int numSeries;
        boolean lastInSeries;
        transient byte[] chunk;
        transient HeapDataOutputStream chunkStream;

        public FetchEntriesReplyMessage() {
        }

        private FetchEntriesReplyMessage(InternalDistributedMember internalDistributedMember, int i, int i2, HeapDataOutputStream heapDataOutputStream, int i3, int i4, int i5, boolean z) {
            setRecipient(internalDistributedMember);
            setProcessorId(i);
            this.bucketId = i2;
            this.seriesNum = i3;
            this.msgNum = i4;
            this.numSeries = i5;
            this.lastInSeries = z;
            this.chunkStream = heapDataOutputStream;
        }

        public static void send(final InternalDistributedMember internalDistributedMember, final int i, final DM dm, final int i2, BucketRegion bucketRegion) throws ForceReattemptException {
            Assert.assertTrue(internalDistributedMember != null, "FetchEntriesReplyMessage NULL reply message");
            LogWriterI18n loggerI18n = dm.getLoggerI18n();
            RegionVersionVector versionVector = bucketRegion.getVersionVector();
            if (versionVector != null) {
                ReplyMessage.send(internalDistributedMember, i, versionVector.getCloneForTransmission(false), dm);
            }
            if (loggerI18n.fineEnabled()) {
                loggerI18n.fine("Starting PR entries chunking for " + String.valueOf(bucketRegion.size()) + " entries");
            }
            try {
                boolean chunkMap = chunkMap(bucketRegion, InitialImageOperation.CHUNK_SIZE_IN_BYTES, false, new TObjectIntProcedure() { // from class: com.gemstone.gemfire.internal.cache.partitioned.FetchEntriesMessage.FetchEntriesReplyMessage.1
                    int msgNum = 0;
                    boolean last = false;

                    @Override // com.gemstone.gnu.trove.TObjectIntProcedure
                    public boolean execute(Object obj, int i3) {
                        HeapDataOutputStream heapDataOutputStream = (HeapDataOutputStream) obj;
                        this.last = i3 > 0;
                        try {
                            InternalDistributedMember internalDistributedMember2 = InternalDistributedMember.this;
                            int i4 = i;
                            int i5 = i2;
                            DM dm2 = dm;
                            int i6 = this.msgNum;
                            this.msgNum = i6 + 1;
                            return FetchEntriesReplyMessage.sendChunk(internalDistributedMember2, i4, i5, dm2, heapDataOutputStream, 0, i6, 1, this.last);
                        } catch (CancelException e) {
                            return false;
                        }
                    }
                }, loggerI18n);
                if (loggerI18n.fineEnabled()) {
                    loggerI18n.fine((chunkMap ? "Finished" : "DID NOT complete") + " PR entries chunking");
                }
            } catch (IOException e) {
                throw new ForceReattemptException(LocalizedStrings.FetchEntriesMessage_UNABLE_TO_SEND_RESPONSE_TO_FETCHENTRIES_REQUEST.toLocalizedString(), e);
            }
        }

        static boolean sendChunk(InternalDistributedMember internalDistributedMember, int i, int i2, DM dm, HeapDataOutputStream heapDataOutputStream, int i3, int i4, int i5, boolean z) {
            Set putOutgoing = dm.putOutgoing(new FetchEntriesReplyMessage(internalDistributedMember, i, i2, heapDataOutputStream, i3, i4, i5, z));
            return putOutgoing == null || putOutgoing.size() == 0;
        }

        static boolean chunkMap(BucketRegion bucketRegion, int i, boolean z, TObjectIntProcedure tObjectIntProcedure, LogWriterI18n logWriterI18n) throws IOException {
            boolean z2;
            Iterator it = bucketRegion.entrySet().iterator();
            HeapDataOutputStream heapDataOutputStream = new HeapDataOutputStream(InitialImageOperation.CHUNK_SIZE_IN_BYTES + 2048);
            do {
                heapDataOutputStream.reset();
                int i2 = 0;
                int i3 = 0;
                while (heapDataOutputStream.size() + i2 < InitialImageOperation.CHUNK_SIZE_IN_BYTES && it.hasNext()) {
                    LocalRegion.NonTXEntry nonTXEntry = (LocalRegion.NonTXEntry) it.next();
                    RegionEntry regionEntry = nonTXEntry.getRegionEntry();
                    synchronized (regionEntry) {
                        Object _getValue = regionEntry._getValue();
                        if (_getValue == null) {
                            _getValue = regionEntry.getSerializedValueOnDisk((LocalRegion) nonTXEntry.getRegion());
                        }
                        if (!Token.isRemoved(_getValue)) {
                            DataSerializer.writeObject(regionEntry.getKey(), heapDataOutputStream);
                            if (Token.isInvalid(_getValue)) {
                                _getValue = null;
                            }
                            VersionStamp versionStamp = regionEntry.getVersionStamp();
                            VersionTag asVersionTag = versionStamp != null ? versionStamp.asVersionTag() : null;
                            if (asVersionTag != null) {
                                asVersionTag.replaceNullIDs(bucketRegion.getVersionMember());
                            }
                            DataSerializer.writeObject(_getValue, heapDataOutputStream);
                            DataSerializer.writeObject(asVersionTag, heapDataOutputStream);
                            i3++;
                            i2 = heapDataOutputStream.size() / i3;
                        }
                    }
                }
                DataSerializer.writeObject((Object) null, heapDataOutputStream);
                int i4 = it.hasNext() ? 0 : 1;
                boolean execute = tObjectIntProcedure.execute(heapDataOutputStream, i4);
                z2 = i4 == 1 && execute;
                if (!execute) {
                    break;
                }
            } while (it.hasNext());
            return z2;
        }

        @Override // com.gemstone.gemfire.distributed.internal.ReplyMessage
        public void process(DM dm, ReplyProcessor21 replyProcessor21) {
            long timestamp = getTimestamp();
            LogWriterI18n loggerI18n = dm.getLoggerI18n();
            FetchEntriesResponse fetchEntriesResponse = (FetchEntriesResponse) replyProcessor21;
            if (fetchEntriesResponse == null) {
                if (DistributionManager.VERBOSE) {
                    loggerI18n.fine("FetchEntriesReplyMessage processor not found");
                }
            } else {
                fetchEntriesResponse.processChunk(this);
                if (DistributionManager.VERBOSE) {
                    loggerI18n.fine(fetchEntriesResponse + " processed " + this);
                }
                dm.getStats().incReplyMessageTime(DistributionStats.getStatTime() - timestamp);
            }
        }

        @Override // com.gemstone.gemfire.distributed.internal.ReplyMessage, com.gemstone.gemfire.distributed.internal.DistributionMessage, com.gemstone.gemfire.internal.DataSerializableFixedID
        public void toData(DataOutput dataOutput) throws IOException {
            super.toData(dataOutput);
            dataOutput.writeInt(this.bucketId);
            dataOutput.writeInt(this.seriesNum);
            dataOutput.writeInt(this.msgNum);
            dataOutput.writeInt(this.numSeries);
            dataOutput.writeBoolean(this.lastInSeries);
            DataSerializer.writeObjectAsByteArray(this.chunkStream, dataOutput);
        }

        @Override // com.gemstone.gemfire.distributed.internal.ReplyMessage, com.gemstone.gemfire.internal.DataSerializableFixedID
        public int getDSFID() {
            return 65;
        }

        @Override // com.gemstone.gemfire.distributed.internal.ReplyMessage, com.gemstone.gemfire.distributed.internal.DistributionMessage, com.gemstone.gemfire.internal.DataSerializableFixedID
        public void fromData(DataInput dataInput) throws IOException, ClassNotFoundException {
            super.fromData(dataInput);
            this.bucketId = dataInput.readInt();
            this.seriesNum = dataInput.readInt();
            this.msgNum = dataInput.readInt();
            this.numSeries = dataInput.readInt();
            this.lastInSeries = dataInput.readBoolean();
            this.chunk = DataSerializer.readByteArray(dataInput);
        }

        @Override // com.gemstone.gemfire.distributed.internal.ReplyMessage, com.gemstone.gemfire.distributed.internal.DistributionMessage
        public String toString() {
            StringBuffer stringBuffer = new StringBuffer();
            stringBuffer.append("FetchEntriesReplyMessage ").append("processorid=").append(this.processorId).append(",bucketId=").append(this.bucketId);
            if (getSender() != null) {
                stringBuffer.append(",sender=").append(getSender());
            }
            stringBuffer.append(",seriesNum=").append(this.seriesNum).append(",msgNum=").append(this.msgNum).append(",numSeries=").append(this.numSeries).append(",lastInSeries=").append(this.lastInSeries);
            if (this.chunk != null) {
                stringBuffer.append(",size=").append(this.chunk.length);
            } else if (this.chunkStream != null) {
                stringBuffer.append(",size=").append(this.chunkStream.size());
            }
            if (getException() != null) {
                stringBuffer.append(",exception=").append(getException());
            }
            return stringBuffer.toString();
        }
    }

    /* loaded from: input_file:com/gemstone/gemfire/internal/cache/partitioned/FetchEntriesMessage$FetchEntriesResponse.class */
    public static class FetchEntriesResponse extends ReplyProcessor21 {
        private final PartitionedRegion pr;
        private volatile RegionVersionVector returnRVV;
        private final HashMap<Object, Object> returnValue;
        private final HashMap<Object, VersionTag> returnVersions;
        private final Object endLock;
        private volatile int chunksProcessed;
        private volatile int chunksExpected;
        private volatile boolean lastChunkReceived;
        private int bucketId;
        private InternalDistributedMember recipient;

        public FetchEntriesResponse(InternalDistributedSystem internalDistributedSystem, PartitionedRegion partitionedRegion, final InternalDistributedMember internalDistributedMember, final int i) {
            super(internalDistributedSystem, Collections.singleton(internalDistributedMember));
            this.returnVersions = new HashMap<>();
            this.endLock = new Object();
            this.pr = partitionedRegion;
            this.bucketId = i;
            this.recipient = internalDistributedMember;
            this.returnValue = new HashMap<Object, Object>() { // from class: com.gemstone.gemfire.internal.cache.partitioned.FetchEntriesMessage.FetchEntriesResponse.1
                private static final long serialVersionUID = 0;

                @Override // java.util.AbstractMap
                public String toString() {
                    return "Bucket id = " + i + " from member = " + internalDistributedMember + ": " + super.toString();
                }
            };
        }

        @Override // com.gemstone.gemfire.distributed.internal.ReplyProcessor21
        public void process(DistributionMessage distributionMessage) {
            if (distributionMessage instanceof ReplyMessage) {
                Object returnValue = ((ReplyMessage) distributionMessage).getReturnValue();
                if (returnValue instanceof RegionVersionVector) {
                    this.returnRVV = (RegionVersionVector) returnValue;
                    return;
                }
            }
            super.process(distributionMessage);
        }

        void processChunk(FetchEntriesReplyMessage fetchEntriesReplyMessage) {
            boolean z = false;
            if (fetchEntriesReplyMessage.getException() != null) {
                process(fetchEntriesReplyMessage);
                return;
            }
            boolean z2 = true;
            try {
                DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(fetchEntriesReplyMessage.chunk));
                boolean keyRequiresRegionContext = this.pr.keyRequiresRegionContext();
                while (dataInputStream.available() > 0) {
                    z2 = true;
                    Object readObject = DataSerializer.readObject(dataInputStream);
                    if (readObject != null) {
                        if (keyRequiresRegionContext) {
                            ((KeyWithRegionContext) readObject).setRegionContext(this.pr);
                        }
                        z2 = false;
                        Object readObject2 = DataSerializer.readObject(dataInputStream);
                        VersionTag versionTag = (VersionTag) DataSerializer.readObject(dataInputStream);
                        synchronized (this.returnValue) {
                            this.returnValue.put(readObject, readObject2);
                            this.returnVersions.put(readObject, versionTag);
                        }
                    } else {
                        Assert.assertTrue(dataInputStream.available() == 0);
                    }
                }
                synchronized (this.endLock) {
                    this.chunksProcessed++;
                    if (fetchEntriesReplyMessage.seriesNum + 1 == fetchEntriesReplyMessage.numSeries && fetchEntriesReplyMessage.lastInSeries) {
                        this.chunksExpected = fetchEntriesReplyMessage.msgNum + 1;
                        this.lastChunkReceived = true;
                    }
                    if (this.lastChunkReceived && this.chunksExpected == this.chunksProcessed) {
                        z = true;
                    }
                    if (DistributionManager.VERBOSE) {
                        getDistributionManager().getLoggerI18n().fine(String.valueOf(this) + " chunksProcessed=" + this.chunksProcessed + ",lastChunkReceived=" + this.lastChunkReceived + ",chunksExpected=" + this.chunksExpected + ",done=" + z);
                    }
                }
            } catch (Exception e) {
                if (z2) {
                    processException(new ReplyException(LocalizedStrings.FetchEntriesMessage_ERROR_DESERIALIZING_KEYS.toLocalizedString(), e));
                } else {
                    processException(new ReplyException(LocalizedStrings.FetchEntriesMessage_ERROR_DESERIALIZING_VALUES.toLocalizedString(), e));
                }
                checkIfDone();
            }
            if (z) {
                process(fetchEntriesReplyMessage);
            }
        }

        public BucketDump waitForEntries() throws ForceReattemptException {
            try {
                waitForRepliesUninterruptibly();
            } catch (ReplyException e) {
                Throwable cause = e.getCause();
                if (cause instanceof CancelException) {
                    getDistributionManager().getLoggerI18n().fine("FetchKeysResponse got remote cancellation; forcing reattempt.", cause);
                    throw new ForceReattemptException(LocalizedStrings.FetchEntriesMessage_FETCHKEYSRESPONSE_GOT_REMOTE_CANCELLATION_FORCING_REATTEMPT.toLocalizedString(), cause);
                }
                if (cause instanceof ForceReattemptException) {
                    throw new ForceReattemptException(LocalizedStrings.FetchEntriesMessage_PEER_REQUESTS_REATTEMPT.toLocalizedString(), cause);
                }
                e.handleAsUnexpected();
            }
            if (!this.lastChunkReceived) {
                throw new ForceReattemptException(LocalizedStrings.FetchEntriesMessage_NO_REPLIES_RECEIVED.toLocalizedString());
            }
            for (Map.Entry<Object, Object> entry : this.returnValue.entrySet()) {
                Object value = entry.getValue();
                if (value instanceof CachedDeserializable) {
                    entry.setValue(((CachedDeserializable) value).getDeserializedValue(null, null));
                }
            }
            return new BucketDump(this.bucketId, this.recipient, this.returnRVV, this.returnValue, this.returnVersions);
        }
    }

    public FetchEntriesMessage() {
    }

    private FetchEntriesMessage(InternalDistributedMember internalDistributedMember, int i, ReplyProcessor21 replyProcessor21, int i2) {
        super(internalDistributedMember, i, replyProcessor21);
        this.bucketId = i2;
    }

    public static FetchEntriesResponse send(InternalDistributedMember internalDistributedMember, PartitionedRegion partitionedRegion, int i) throws ForceReattemptException {
        Assert.assertTrue(internalDistributedMember != null, "FetchEntriesMessage NULL reply message");
        FetchEntriesResponse fetchEntriesResponse = new FetchEntriesResponse(partitionedRegion.getSystem(), partitionedRegion, internalDistributedMember, i);
        FetchEntriesMessage fetchEntriesMessage = new FetchEntriesMessage(internalDistributedMember, partitionedRegion.getPRId(), fetchEntriesResponse, i);
        Set putOutgoing = partitionedRegion.getDistributionManager().putOutgoing(fetchEntriesMessage);
        if (putOutgoing == null || putOutgoing.size() <= 0) {
            return fetchEntriesResponse;
        }
        throw new ForceReattemptException(LocalizedStrings.FetchEntriesMessage_FAILED_SENDING_0.toLocalizedString(fetchEntriesMessage));
    }

    @Override // com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage
    protected boolean operateOnPartitionedRegion(DistributionManager distributionManager, PartitionedRegion partitionedRegion, long j) throws CacheException, ForceReattemptException {
        LogWriterI18n loggerI18n = partitionedRegion.getCache().getLoggerI18n();
        if (DistributionManager.VERBOSE) {
            loggerI18n.fine("FetchEntriesMessage operateOnRegion: " + partitionedRegion.getFullPath());
        }
        PartitionedRegionDataStore dataStore = partitionedRegion.getDataStore();
        BucketRegion bucketRegion = null;
        if (dataStore != null) {
            bucketRegion = dataStore.handleRemoteGetEntries(this.bucketId);
            if (DistributionManager.VERBOSE) {
                loggerI18n.fine("FetchKeysMessage send keys back using processorId: " + getProcessorId());
            }
        } else {
            loggerI18n.warning(LocalizedStrings.FetchEntriesMessage_FETCHKEYSMESSAGE_DATA_STORE_NOT_CONFIGURED_FOR_THIS_MEMBER);
        }
        partitionedRegion.getPrStats().endPartitionMessagesProcessing(j);
        FetchEntriesReplyMessage.send(getSender(), getProcessorId(), distributionManager, this.bucketId, bucketRegion);
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage
    public void appendFields(StringBuffer stringBuffer) {
        super.appendFields(stringBuffer);
        stringBuffer.append("; bucketId=").append(this.bucketId);
        stringBuffer.append("; recipient=").append(getRecipient());
    }

    @Override // com.gemstone.gemfire.internal.DataSerializableFixedID
    public int getDSFID() {
        return 46;
    }

    @Override // com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage, com.gemstone.gemfire.distributed.internal.DistributionMessage, com.gemstone.gemfire.internal.DataSerializableFixedID
    public void fromData(DataInput dataInput) throws IOException, ClassNotFoundException {
        super.fromData(dataInput);
        this.bucketId = dataInput.readInt();
    }

    @Override // com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage, com.gemstone.gemfire.distributed.internal.DistributionMessage, com.gemstone.gemfire.internal.DataSerializableFixedID
    public void toData(DataOutput dataOutput) throws IOException {
        super.toData(dataOutput);
        dataOutput.writeInt(this.bucketId);
    }
}
