/*
 * Decompiled with CFR 0.152.
 */
package de.ruedigermoeller.fastcast.packeting;

import de.ruedigermoeller.fastcast.packeting.DataPacket;
import de.ruedigermoeller.fastcast.packeting.MsgReceiver;
import de.ruedigermoeller.fastcast.packeting.RetransPacket;
import de.ruedigermoeller.fastcast.packeting.SimpleByteArrayReceiver;
import de.ruedigermoeller.fastcast.packeting.TopicEntry;
import de.ruedigermoeller.fastcast.packeting.TopicStats;
import de.ruedigermoeller.fastcast.remoting.DecodeInTransportThread;
import de.ruedigermoeller.fastcast.remoting.FCReceiveContext;
import de.ruedigermoeller.fastcast.remoting.FCRemotingListener;
import de.ruedigermoeller.fastcast.remoting.FastCast;
import de.ruedigermoeller.fastcast.remoting.PerSenderThread;
import de.ruedigermoeller.fastcast.util.FCLog;
import de.ruedigermoeller.fastcast.util.FCUtils;
import de.ruedigermoeller.heapoff.bytez.Bytez;
import de.ruedigermoeller.heapoff.bytez.BytezAllocator;
import de.ruedigermoeller.heapoff.bytez.malloc.MallocBytezAllocator;
import de.ruedigermoeller.heapoff.structs.FSTStruct;
import de.ruedigermoeller.heapoff.structs.FSTTypedStructAllocator;
import de.ruedigermoeller.heapoff.structs.structtypes.StructArray;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class PacketReceiveBuffer {
    final int topic;
    final int payMaxLen;
    final FSTTypedStructAllocator<DataPacket> packetAllocator;
    final StructArray<DataPacket> readBuffer;
    AtomicLong maxOrderedSeq = new AtomicLong(0L);
    AtomicLong maxDeliveredSeq = new AtomicLong(0L);
    long highestSeq = 0L;
    String receivesFrom;
    MsgReceiver receiver;
    RetransPacket retrans;
    SimpleByteArrayReceiver decoder = new SimpleByteArrayReceiver(){

        @Override
        public void msgDone(long seq, Bytez b, int off, int len) {
            if (PacketReceiveBuffer.this.receiver != null) {
                PacketReceiveBuffer.this.receiver.messageReceived(PacketReceiveBuffer.this.receivesFrom, seq, b, off, len);
            }
        }
    };
    Executor deliveryThread;
    Executor topicWideDeliveryThread;
    private boolean isUnordered = false;
    private boolean isUnreliable = false;
    boolean decodeInTransportThread = false;
    TopicStats stats;
    int lastOrderedSendPause;
    private boolean terminated = false;
    int dGramSize;
    static int recMatchCount;
    int retransCount = 0;
    long firstGapDetected = 0L;
    long maxDelayNextRetrans = 50L;
    long maxDelayRetrans = 10L;
    boolean inInitialSync = true;
    TopicEntry topicEntry;
    long startTime = 0L;
    long logBremse;
    FSTStruct currentPacketBytePointer = new FSTStruct();
    long debugPrevSeq = 0L;
    long lastPacket = 0L;
    int packCount = 0;
    FSTStruct tmpStruct = new FSTStruct();
    DataPacket tmpPacket;

    public PacketReceiveBuffer(int dataGramSizeBytes, String clusterName, String nodeId, int historySize, String receivesFrom, TopicEntry entry, MsgReceiver receiver, Executor topicReceiverThread) {
        this.topicWideDeliveryThread = topicReceiverThread;
        this.topicEntry = entry;
        this.dGramSize = dataGramSizeBytes;
        int decodeQSize = entry.getConf().getDecodeQSize();
        Class<?> serv = null;
        try {
            serv = Class.forName(this.topicEntry.getServiceClazz());
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
        this.topic = entry.getTopic();
        this.decodeInTransportThread = this.topicEntry.getConf().isDecodeInTransportThread();
        if (serv.getAnnotation(DecodeInTransportThread.class) != null) {
            this.decodeInTransportThread = serv.getAnnotation(DecodeInTransportThread.class).value();
        }
        boolean perSenderThread = this.topicEntry.getConf().isPerSenderThread();
        if (serv.getAnnotation(PerSenderThread.class) != null) {
            perSenderThread = serv.getAnnotation(PerSenderThread.class).value();
        }
        if (perSenderThread) {
            if (this.deliveryThread == null) {
                this.deliveryThread = FCUtils.createIncomingMessageThreadExecutor("delivery " + receivesFrom + " " + this.topic, decodeQSize);
            }
        } else {
            this.deliveryThread = this.topicWideDeliveryThread;
        }
        this.receiver = receiver;
        DataPacket template = DataPacket.getTemplate(dataGramSizeBytes);
        this.payMaxLen = template.data.length;
        template.getCluster().setString(clusterName);
        template.getSender().setString(nodeId);
        template.setTopic(this.topic);
        RetransPacket retransTemplate = new RetransPacket();
        retransTemplate.getCluster().setString(clusterName);
        retransTemplate.getSender().setString(nodeId);
        retransTemplate.getReceiver().setString(receivesFrom);
        retransTemplate.setTopic(this.topic);
        retransTemplate.setSeqNo(-1L);
        this.packetAllocator = new FSTTypedStructAllocator((FSTStruct)template, 10, (BytezAllocator)new MallocBytezAllocator());
        this.readBuffer = this.packetAllocator.newArray(historySize);
        if (this.readBuffer.getByteSize() > 0x500000) {
            FCLog.log("allocating read buffer for topic '" + this.topicEntry.getName() + "' of " + this.readBuffer.getByteSize() / 1024 / 1024 + " MByte");
        } else {
            FCLog.log("allocating read buffer for topic '" + this.topicEntry.getName() + "' of " + this.readBuffer.getByteSize() / 1024 + " KByte");
        }
        this.retrans = (RetransPacket)this.packetAllocator.newStruct((FSTStruct)retransTemplate);
        this.receivesFrom = receivesFrom;
        this.stats = this.topicEntry.getStats();
        this.isUnordered = this.topicEntry.isUnordered();
        this.isUnreliable = this.topicEntry.isUnreliable();
        this.maxDelayRetrans = this.topicEntry.getConf().getMaxDelayRetransMS();
        this.maxDelayNextRetrans = this.topicEntry.getConf().getMaxDelayNextRetransMS();
    }

    public TopicEntry getTopicEntry() {
        return this.topicEntry;
    }

    DataPacket getPacketVolatile(long seqNo) {
        return (DataPacket)this.readBuffer.get((int)(seqNo % (long)this.readBuffer.size()));
    }

    public long getMaxDelayNextRetrans() {
        return this.maxDelayNextRetrans;
    }

    public void setMaxDelayNextRetrans(long maxDelayNextRetrans) {
        this.maxDelayNextRetrans = maxDelayNextRetrans;
    }

    public long getMaxDelayRetrans() {
        return this.maxDelayRetrans;
    }

    public void setMaxDelayRetrans(long maxDelayRetrans) {
        this.maxDelayRetrans = maxDelayRetrans;
    }

    public RetransPacket receivePacket(DataPacket packet) {
        this.stats.dataPacketReceived(packet.getDGramSize());
        if (this.maxOrderedSeq.get() == 0L && this.startTime == 0L) {
            this.startTime = System.currentTimeMillis();
        }
        if (this.isUnreliable) {
            this.receivePacketUnreliable(packet);
            return null;
        }
        if (this.isUnordered) {
            RetransPacket retransPacket = this.receivePacketUnOrdered(packet);
            if (retransPacket != null) {
                this.stats.retransRQSent(retransPacket.computeNumPackets());
            }
            return retransPacket;
        }
        RetransPacket retransPacket = this.receivePacketOrdered(packet);
        if (retransPacket != null) {
            this.stats.retransRQSent(retransPacket.computeNumPackets());
        }
        return retransPacket;
    }

    public void receivePacketUnreliable(DataPacket packet) {
        DataPacket previousPacket;
        long seqNo = packet.getSeqNo();
        int index = (int)(seqNo % (long)this.readBuffer.size());
        this.highestSeq = Math.max(seqNo, this.highestSeq);
        long now = System.currentTimeMillis();
        if (this.maxOrderedSeq.get() == 0L) {
            this.handleInitialSync(seqNo);
        }
        if (!(previousPacket = this.getPacketVolatile(seqNo)).isDecoded() && previousPacket.getSeqNo() > 0L) {
            return;
        }
        this.readBuffer.set(index, (FSTStruct)packet);
        DataPacket toDecode = (DataPacket)this.readBuffer.get(index);
        this.decodePacket(toDecode);
    }

    public RetransPacket receivePacketUnOrdered(DataPacket packet) {
        DataPacket previousPacket;
        RetransPacket toReturn = null;
        long seqNo = packet.getSeqNo();
        int index = (int)(seqNo % (long)this.readBuffer.size());
        this.highestSeq = Math.max(seqNo, this.highestSeq);
        long now = System.currentTimeMillis();
        if (seqNo != this.maxOrderedSeq.get() + 1L && this.firstGapDetected > 0L && now - this.firstGapDetected > this.maxDelayRetrans) {
            toReturn = this.computeRetransPacket(now);
        }
        if (this.maxOrderedSeq.get() == 0L) {
            this.handleInitialSync(seqNo);
        }
        if (!(previousPacket = this.getPacketVolatile(seqNo)).isDecoded() && previousPacket.getSeqNo() > 0L) {
            return toReturn;
        }
        if (seqNo == this.maxOrderedSeq.get() + 1L) {
            this.readBuffer.set(index, (FSTStruct)packet);
            this.maxOrderedSeq.set(seqNo);
            DataPacket toDecode = (DataPacket)this.readBuffer.get(index);
            int sendPauseSender = toDecode.getSendPauseSender();
            if (sendPauseSender > 0) {
                this.lastOrderedSendPause = sendPauseSender;
            }
            this.decodePacket(toDecode);
            if (!this.inSync()) {
                DataPacket pack = this.getPacketVolatile(seqNo + 1L);
                while (pack.getSeqNo() == seqNo + 1L) {
                    if (!pack.isDecoded()) {
                        this.decodePacket(pack);
                    }
                    this.maxOrderedSeq.set(++seqNo);
                    pack = this.getPacketVolatile(seqNo + 1L);
                }
                this.highestSeq = Math.max(this.maxOrderedSeq.get(), this.highestSeq);
                if (this.inSync()) {
                    this.firstGapDetected = 0L;
                    this.retransCount = 0;
                    return toReturn;
                }
                return toReturn;
            }
            return toReturn;
        }
        if (this.firstGapDetected == 0L) {
            this.firstGapDetected = now;
        }
        if (((DataPacket)this.readBuffer.get(index)).isDecoded()) {
            this.readBuffer.set(index, (FSTStruct)packet);
            this.decodePacket((DataPacket)this.readBuffer.get(index));
        }
        return toReturn;
    }

    public RetransPacket receivePacketOrdered(DataPacket packet) {
        long prevSeq;
        long now;
        if (this.retransCount > 1 && (now = System.currentTimeMillis()) - this.logBremse > 1000L) {
            System.out.println("wait for retrans, received " + packet.getSeqNo() + " " + this.getTopicEntry().getConf().getName() + " waiting for " + (this.maxOrderedSeq.get() + 1L));
            if (packet.getSeqNo() < this.maxOrderedSeq.get()) {
                System.out.println("   sent by " + packet.getSender());
            }
            this.logBremse = now;
        }
        RetransPacket toReturn = null;
        long seqNo = packet.getSeqNo();
        int index = (int)(seqNo % (long)this.readBuffer.size());
        this.highestSeq = Math.max(seqNo, this.highestSeq);
        long now2 = System.currentTimeMillis();
        if (seqNo <= this.maxOrderedSeq.get()) {
            return null;
        }
        if (this.maxOrderedSeq.get() == 0L) {
            this.handleInitialSync(seqNo);
        }
        if (seqNo != this.maxOrderedSeq.get() + 1L && this.firstGapDetected > 0L && now2 - this.firstGapDetected >= this.maxDelayRetrans) {
            toReturn = this.computeRetransPacket(now2);
        }
        if (this.maxOrderedSeq.get() - this.maxDeliveredSeq.get() > (long)(this.readBuffer.size() - 3)) {
            return toReturn;
        }
        if (seqNo == this.maxOrderedSeq.get() + 1L) {
            this.readBuffer.set(index, (FSTStruct)packet);
            this.maxOrderedSeq.set(seqNo);
            DataPacket toDecode = (DataPacket)this.readBuffer.get(index);
            int sendPauseSender = toDecode.getSendPauseSender();
            if (sendPauseSender > 0) {
                this.lastOrderedSendPause = sendPauseSender;
            }
            this.decodePacket(toDecode);
            this.retransCount = 0;
            if (!this.inSync()) {
                boolean onePack = true;
                while (onePack) {
                    onePack = false;
                    DataPacket pack = this.getPacketVolatile(seqNo + 1L);
                    while (pack.getSeqNo() == seqNo + 1L) {
                        this.decodePacket(pack);
                        this.maxOrderedSeq.set(++seqNo);
                        pack = this.getPacketVolatile(seqNo + 1L);
                        onePack = true;
                    }
                }
                this.highestSeq = Math.max(this.maxOrderedSeq.get(), this.highestSeq);
                if (this.inSync()) {
                    this.firstGapDetected = 0L;
                    return null;
                }
                if (toReturn != null) {
                    // empty if block
                }
                return toReturn;
            }
            return null;
        }
        if (this.firstGapDetected == 0L) {
            this.firstGapDetected = now2;
        }
        if ((prevSeq = ((DataPacket)this.readBuffer.get(index)).getSeqNo()) < this.maxDeliveredSeq.get()) {
            this.readBuffer.set(index, (FSTStruct)packet);
        }
        if (toReturn != null) {
            // empty if block
        }
        return toReturn;
    }

    private void handleInitialSync(long seqNo) {
        this.maxOrderedSeq.set(seqNo - 1L);
        this.maxDeliveredSeq.set(seqNo - 1L);
        this.inInitialSync = true;
        FCLog.get().cluster("for sender " + this.receivesFrom + " bootstrap sequence " + this.getTopicEntry().getConf().getName() + " no " + seqNo);
        FCRemotingListener remotingListener = FastCast.getRemoting().getRemotingListener();
        if (remotingListener != null) {
            remotingListener.senderBootstrapped(this.topicEntry.getTopic(), this.topicEntry.getConf().getName(), this.receivesFrom, seqNo);
        }
    }

    private RetransPacket computeRetransPacket(long now) {
        RetransPacket toReturn = this.retrans;
        toReturn.clear();
        toReturn.setSent(System.nanoTime());
        long curSeq = this.maxOrderedSeq.get() + 1L;
        while (curSeq < this.highestSeq && !toReturn.isFull()) {
            if (this.getPacketVolatile(curSeq).getSeqNo() != curSeq) {
                toReturn.current().setFrom(curSeq);
                ++curSeq;
                while (curSeq < this.highestSeq && !toReturn.isFull() && this.getPacketVolatile(curSeq).getSeqNo() != curSeq) {
                    ++curSeq;
                }
                toReturn.current().setTo(curSeq);
                toReturn.nextEntry();
                continue;
            }
            ++curSeq;
        }
        ++this.retransCount;
        if (this.retransCount > 10) {
            FCLog.get().warn("retransmission retrial at " + this.maxOrderedSeq + " count " + this.retransCount + " highest " + this.highestSeq + " stream " + this.getTopicEntry().getConf().getName() + " retrans:" + (Object)((Object)this.retrans));
        }
        this.firstGapDetected = this.maxDelayNextRetrans * (long)Math.max(this.retransCount, 100) + now;
        toReturn.setSendPauseSender(this.lastOrderedSendPause);
        return toReturn;
    }

    private boolean isUnordered() {
        return this.isUnordered;
    }

    public boolean inSync() {
        return this.highestSeq == this.maxOrderedSeq.get();
    }

    void decodePacket(DataPacket packet) {
        final long packetSeqNo = packet.getSeqNo();
        if (this.receiver == null) {
            return;
        }
        if (this.tmpPacket == null) {
            this.tmpPacket = (DataPacket)FSTTypedStructAllocator.newPointer(DataPacket.class);
        }
        packet.dataPointer(this.tmpStruct);
        final Bytez dataPacketBase = this.tmpStruct.getBase();
        final int dataindex = (int)this.tmpStruct.getOffset();
        final int packIndex = (int)packet.getOffset();
        if (!this.decodeInTransportThread) {
            Runnable decode = new Runnable(){

                @Override
                public void run() {
                    PacketReceiveBuffer.this.decodeMsgBytes(packetSeqNo, dataPacketBase, dataindex, packIndex);
                }
            };
            this.deliveryThread.execute(decode);
        } else {
            this.decodeMsgBytes(packetSeqNo, dataPacketBase, dataindex, packIndex);
        }
    }

    private void decodeMsgBytes(long packetSeqNo, Bytez dataPacketBase, int dataindex, int packIndex) {
        FCReceiveContext.get().setSender(this.receivesFrom);
        long now = System.currentTimeMillis();
        ++this.packCount;
        if (now - this.lastPacket > 1000L) {
            int persec = (int)((long)this.packCount * 1000L / (now - this.lastPacket));
            this.lastPacket = now;
            this.packCount = 0;
        }
        if (!this.isUnordered() && !this.isUnreliable() && this.debugPrevSeq != 0L && this.debugPrevSeq != packetSeqNo - 1L) {
            FCLog.get().fatal("FATAL ERROR " + packetSeqNo);
            System.exit(1);
        }
        this.debugPrevSeq = packetSeqNo;
        this.currentPacketBytePointer.baseOn(dataPacketBase, dataindex);
        while (true) {
            short code;
            if ((code = this.currentPacketBytePointer.getShort()) > 3 || code < 0) {
                FCLog.get().warn("foreign traffic or error assume delivered: " + this.maxDeliveredSeq.get() + " maxOrdered " + this.maxOrderedSeq.get() + " packseq " + packetSeqNo + " highest " + this.highestSeq);
                System.exit(1);
            }
            this.currentPacketBytePointer.next(2);
            if (code == 3) {
                if (this.isUnordered() || this.isUnreliable()) {
                    this.tmpPacket.baseOn(dataPacketBase, packIndex);
                    this.tmpPacket.setDecoded(true);
                }
                this.maxDeliveredSeq.set(Math.max(this.maxDeliveredSeq.get(), packetSeqNo));
                return;
            }
            short len = this.currentPacketBytePointer.getShort();
            this.currentPacketBytePointer.next(2);
            if (this.inInitialSync) {
                if (code == 1) {
                    this.inInitialSync = false;
                }
            } else {
                if ((packetSeqNo & 0x7FFL) == 0L && this.topicEntry.hadHeartbeat(this.receivesFrom)) {
                    this.topicEntry.registerHeartBeat(this.receivesFrom, System.currentTimeMillis());
                }
                this.decoder.receiveChunk(packetSeqNo, this.currentPacketBytePointer.getBase(), (int)this.currentPacketBytePointer.getOffset(), len, code == 1);
                this.stats.msgReceived();
            }
            this.currentPacketBytePointer.next((int)len);
        }
    }

    public void setUnreliable(boolean unreliable) {
        this.isUnreliable = unreliable;
    }

    public boolean isUnreliable() {
        return this.isUnreliable;
    }

    public void terminate() {
        if (this.deliveryThread != this.topicWideDeliveryThread) {
            ((ThreadPoolExecutor)this.deliveryThread).shutdown();
            new Thread("term receiver "){

                @Override
                public void run() {
                    try {
                        ((ThreadPoolExecutor)PacketReceiveBuffer.this.deliveryThread).awaitTermination(10000L, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    PacketReceiveBuffer.this.freeImmediate();
                }
            }.start();
        } else {
            this.freeImmediate();
        }
        this.terminated = true;
    }

    private void freeImmediate() {
        long alloced = MallocBytezAllocator.alloced.get();
        this.packetAllocator.free();
        long curr = MallocBytezAllocator.alloced.get();
        FCLog.log("freed " + (alloced - curr) / 1024L / 1024L + "MB to " + curr / 1024L / 1024L + " MB");
    }
}

