/*
 * Decompiled with CFR 0.152.
 */
package convex.peer;

import convex.core.cpos.Belief;
import convex.core.cpos.BeliefMerge;
import convex.core.cpos.Block;
import convex.core.cpos.Order;
import convex.core.crypto.AKeyPair;
import convex.core.cvm.State;
import convex.core.data.ABlobLike;
import convex.core.data.ACell;
import convex.core.data.AccountKey;
import convex.core.data.Blob;
import convex.core.data.Cells;
import convex.core.data.Format;
import convex.core.data.Index;
import convex.core.data.Ref;
import convex.core.data.SignedData;
import convex.core.data.Vectors;
import convex.core.exceptions.BadFormatException;
import convex.core.exceptions.InvalidDataException;
import convex.core.exceptions.MissingDataException;
import convex.core.message.Message;
import convex.core.message.MessageType;
import convex.core.util.LoadMonitor;
import convex.core.util.Utils;
import convex.peer.AThreadedComponent;
import convex.peer.Server;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeliefPropagator
extends AThreadedComponent {
    private static final long AWAIT_BELIEFS_PAUSE = 30L;
    public static final int BELIEF_REBROADCAST_DELAY = 300;
    public static final int BELIEF_FULL_BROADCAST_DELAY = 500;
    public static final int BELIEF_BROADCAST_DELAY = 10;
    public static final int BELIEF_BROADCAST_POLL_TIME = 1000;
    private ArrayBlockingQueue<Message> beliefQueue = new ArrayBlockingQueue(200);
    static final Logger log = LoggerFactory.getLogger((String)BeliefPropagator.class.getName());
    long beliefReceivedCount = 0L;
    long lastBroadcastTime = 0L;
    long lastFullBroadcastTime = 0L;
    private long beliefBroadcastCount = 0L;
    Belief belief = null;
    private Consumer<SignedData<Order>> orderUpdateObserver;
    private Consumer<Belief> beliefUpdateObserver;
    private Belief lastFullBroadcastBelief;

    public BeliefPropagator(Server server) {
        super(server);
    }

    public long getBeliefBroadcastCount() {
        return this.beliefBroadcastCount;
    }

    public synchronized boolean queueBelief(Message beliefMessage) {
        if (log.isTraceEnabled()) {
            log.trace("Belief queued " + this.server.getPort() + " : " + String.valueOf(beliefMessage.getHash()));
        }
        return this.beliefQueue.offer(beliefMessage);
    }

    @Override
    protected void loop() throws InterruptedException {
        Belief incomingBelief = this.awaitBelief();
        boolean updated = this.maybeUpdateBelief(incomingBelief);
        if (updated && log.isDebugEnabled()) {
            log.debug("Belief updated cps=" + String.valueOf(Vectors.createLongs((long[])this.belief.getOrder(this.server.getPeerKey()).getConsensusPoints())));
        }
        try {
            this.maybeBroadcast(updated);
            this.belief = (Belief)Cells.persist((ACell)this.belief);
        }
        catch (IOException e) {
            if (!this.server.isLive()) {
                return;
            }
            throw (RuntimeException)Utils.sneakyThrow((Throwable)e);
        }
        this.server.updateBelief(this.belief);
    }

    protected boolean maybeBroadcast(boolean updated) throws InterruptedException {
        block4: {
            long ts = Utils.getCurrentTimestamp();
            if (updated || ts > this.lastBroadcastTime + 300L) {
                this.lastBroadcastTime = ts;
                try {
                    Message msg = null;
                    msg = this.createFullUpdateMessage();
                    this.lastFullBroadcastTime = ts;
                    if (msg != null) {
                        this.server.manager.broadcast(msg);
                        ++this.beliefBroadcastCount;
                        return true;
                    }
                    log.warn("Failed to create broadcast message in BeliefPropagator!");
                }
                catch (Exception e) {
                    if (!this.server.isLive()) break block4;
                    log.warn("Error attempting to create broadcast message", (Throwable)e);
                }
            }
        }
        return false;
    }

    @Override
    public void start() {
        this.belief = this.server.getBelief();
        super.start();
    }

    protected boolean maybeUpdateBelief(Belief newBelief) {
        boolean updated = this.maybeMergeBeliefs(newBelief);
        boolean published = false;
        SignedData<Block>[] signedBlocks = this.server.transactionHandler.maybeGenerateBlocks();
        if (signedBlocks != null) {
            this.belief = this.belief.proposeBlock(this.server.getKeyPair(), signedBlocks);
            published = true;
            if (log.isDebugEnabled()) {
                log.debug("Blocks proposed: " + String.valueOf(Vectors.of((Object[])signedBlocks).map(sb -> sb.getHash())));
            }
        }
        if (updated || published) {
            this.observeBeliefUpdate(this.belief);
            return true;
        }
        return false;
    }

    private void observeBeliefUpdate(Belief b) {
        Consumer<Belief> obs = this.beliefUpdateObserver;
        if (obs != null) {
            obs.accept(b);
        }
    }

    protected boolean maybeMergeBeliefs(Belief ... newBeliefs) {
        if (newBeliefs == null || newBeliefs.length == 0) {
            return false;
        }
        try {
            long ts = Utils.getCurrentTimestamp();
            AKeyPair kp = this.server.getKeyPair();
            BeliefMerge mc = BeliefMerge.create((Belief)this.belief, (AKeyPair)kp, (long)ts, (State)this.server.getPeer().getConsensusState());
            Belief newBelief = mc.merge(newBeliefs);
            AccountKey key = mc.getAccountKey();
            Order oldOrder = this.belief.getOrder(key);
            Order newOrder = newBelief.getOrder(key);
            boolean beliefChanged = false;
            beliefChanged = oldOrder == null ? newOrder != null : (newOrder == null ? true : !newOrder.consensusEquals(oldOrder));
            this.belief = newBelief;
            return beliefChanged;
        }
        catch (MissingDataException e) {
            throw new Error("Missing data in belief merge: " + e.getMissingHash().toHexString(), e);
        }
        catch (InvalidDataException e) {
            throw new Error("Invalid data in belief merge!", e);
        }
    }

    private Belief awaitBelief() throws InterruptedException {
        ArrayList<Message> beliefMessages = new ArrayList<Message>();
        LoadMonitor.down();
        Message firstEvent = this.beliefQueue.poll(30L, TimeUnit.MILLISECONDS);
        LoadMonitor.up();
        if (firstEvent == null) {
            return null;
        }
        beliefMessages.add(firstEvent);
        this.beliefQueue.drainTo(beliefMessages);
        if (log.isDebugEnabled()) {
            log.debug("Belief Messages received: " + beliefMessages.size());
        }
        HashMap newOrders = this.belief.getOrdersHashMap();
        boolean anyOrderChanged = false;
        for (Message m : beliefMessages) {
            boolean changed = this.mergeBeliefMessage(newOrders, m);
            if (!changed) continue;
            anyOrderChanged = true;
        }
        if (!anyOrderChanged) {
            return null;
        }
        Belief newBelief = Belief.create((HashMap)newOrders);
        return newBelief;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected boolean mergeBeliefMessage(HashMap<AccountKey, SignedData<Order>> orders, Message m) {
        boolean changed = false;
        AccountKey myKey = this.server.getPeerKey();
        try {
            ++this.beliefReceivedCount;
            try {
                ACell payload = m.getPayload();
                Collection a = Belief.extractOrders((ACell)payload);
                for (SignedData so : a) {
                    AccountKey key = so.getAccountKey();
                    try {
                        if (Cells.equals((ACell)myKey, (ACell)key)) continue;
                        if (orders.containsKey(key)) {
                            Order newOrder = (Order)so.getValue();
                            Order oldOrder = (Order)orders.get(key).getValue();
                            boolean replace = BeliefMerge.compareOrders((Order)oldOrder, (Order)newOrder);
                            if (!replace) continue;
                        }
                        if (!so.checkSignature()) {
                            log.warn("Bad Order signature");
                            this.server.getConnectionManager().alertBadMessage(m, "Bad Order Signature!!");
                            return changed;
                        }
                        so = (SignedData)Cells.persist((ACell)so);
                        this.observeOrderUpdate((SignedData<Order>)so);
                        orders.put(key, (SignedData<Order>)so);
                        changed = true;
                    }
                    catch (MissingDataException e) {
                        this.server.getConnectionManager().alertMissing(m, e, key);
                    }
                    catch (IOException e) {
                        log.warn("IO exception trying to merge Order", (Throwable)e);
                        return changed;
                    }
                }
            }
            catch (MissingDataException e) {
                log.debug("Missing data in Belief message " + String.valueOf(m.getHash()));
                this.server.getConnectionManager().alertMissing(m, e, null);
            }
            return changed;
        }
        catch (BadFormatException | ClassCastException e) {
            this.server.getConnectionManager().alertBadMessage(m, Utils.getClassName((Object)e) + " merging Belief!!");
        }
        return changed;
    }

    private void observeOrderUpdate(SignedData<Order> so) {
        Consumer<SignedData<Order>> obs = this.orderUpdateObserver;
        if (obs != null) {
            obs.accept(so);
        }
    }

    protected Message createFullUpdateMessage() throws IOException {
        ArrayList<ACell> novelty = new ArrayList<ACell>();
        Consumer<Ref> noveltyHandler = r -> {
            ACell o = r.getValue();
            novelty.add(o);
        };
        this.lastFullBroadcastBelief = this.belief = (Belief)Cells.announce((ACell)this.belief, noveltyHandler);
        Message msg = BeliefPropagator.createPartialBelief((ACell)this.belief, novelty);
        long messageSize = msg.getMessageData().count();
        if ((double)messageSize >= 4.75E7) {
            log.warn("Long Belief Delta message: " + messageSize);
        }
        return msg;
    }

    protected Message createQuickUpdateMessage() throws IOException {
        ArrayList<ACell> novelty = new ArrayList<ACell>();
        Consumer<Ref> noveltyHandler = r -> {
            ACell o = r.getValue();
            novelty.add(o);
        };
        AccountKey key = this.server.getPeerKey();
        Index orders = this.belief.getOrders();
        SignedData order = (SignedData)this.belief.getOrders().get((ABlobLike)key);
        if (order == null) {
            return null;
        }
        order = (SignedData)Cells.announce((ACell)order, noveltyHandler);
        orders = orders.assoc((ACell)key, (ACell)order);
        this.belief = this.belief.withOrders(orders);
        Message msg = BeliefPropagator.createPartialBelief((ACell)order, novelty);
        long messageSize = msg.getMessageData().count();
        if ((double)messageSize >= 4.75E7) {
            log.warn("Long Belief Delta message: " + messageSize);
        }
        return msg;
    }

    private static Message createPartialBelief(ACell payload, List<ACell> novelty) {
        int n = novelty.size();
        if (n == 0) {
            novelty.add(payload);
        } else if (!payload.equals(novelty.get(n - 1))) {
            novelty.add(payload);
        }
        Blob data = Format.encodeDelta(novelty);
        return Message.create((MessageType)MessageType.BELIEF, (ACell)payload, (Blob)data);
    }

    public Belief getLastBroadcastBelief() {
        return this.lastFullBroadcastBelief;
    }

    @Override
    protected String getThreadName() {
        return "Belief propagator thread on port " + this.server.getPort();
    }

    public void setOrderUpdateObserver(Consumer<SignedData<Order>> orderUpdateObserver) {
        this.orderUpdateObserver = orderUpdateObserver;
    }

    public void setBeliefUpdateObserver(Consumer<Belief> observer) {
        this.beliefUpdateObserver = observer;
    }
}

