/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.network.cluster;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import net.openhft.chronicle.core.annotation.UsedViaReflection;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.network.NetworkContext;
import net.openhft.chronicle.network.cluster.AbstractSubHandler;
import net.openhft.chronicle.network.cluster.ClusterContext;
import net.openhft.chronicle.network.cluster.HeartbeatEventHandler;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.wire.Demarshallable;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;

public class HeartbeatHandler<T extends NetworkContext>
extends AbstractSubHandler<T>
implements Demarshallable,
WriteMarshallable,
HeartbeatEventHandler {
    public static final ScheduledExecutorService HEARTBEAT_EXECUTOR = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new NamedThreadFactory("Heartbeat"));
    private final long heartbeatIntervalMs;
    private volatile long lastTimeMessageReceived;
    private final long heartbeatTimeoutMs;
    private final AtomicBoolean closed = new AtomicBoolean();
    private final AtomicBoolean hasHeartbeat = new AtomicBoolean();
    private final AtomicReference<Runnable> self = new AtomicReference();

    @UsedViaReflection
    protected HeartbeatHandler(@NotNull WireIn w) {
        this.heartbeatTimeoutMs = w.read(() -> "heartbeatTimeoutMs").int64();
        this.heartbeatIntervalMs = w.read(() -> "heartbeatIntervalMs").int64();
        assert (this.heartbeatTimeoutMs >= 1000L) : "heartbeatTimeoutMs=" + this.heartbeatTimeoutMs + ", this is too small";
        assert (this.heartbeatIntervalMs >= 500L) : "heartbeatIntervalMs=" + this.heartbeatIntervalMs + ", this is too small";
        this.startHeartbeatCheck();
    }

    private HeartbeatHandler(long heartbeatTimeoutMs, long heartbeatIntervalMs) {
        this.heartbeatTimeoutMs = heartbeatTimeoutMs;
        this.heartbeatIntervalMs = heartbeatIntervalMs;
        assert (heartbeatTimeoutMs > heartbeatIntervalMs) : "heartbeatIntervalMs=" + heartbeatIntervalMs + ", " + "heartbeatTimeoutMs=" + heartbeatTimeoutMs;
        assert (heartbeatTimeoutMs >= 1000L) : "heartbeatTimeoutMs=" + heartbeatTimeoutMs + ", this is too small";
        assert (heartbeatIntervalMs >= 500L) : "heartbeatIntervalMs=" + heartbeatIntervalMs + ", this is too small";
    }

    @Override
    public void onInitialize(WireOut outWire) {
        if (this.nc().isAcceptor()) {
            HeartbeatHandler.heartbeatHandler(this.heartbeatTimeoutMs, this.heartbeatIntervalMs, this.cid()).writeMarshallable(outWire);
        }
        WriteMarshallable heartbeatMessage = w -> {
            w.writeDocument(true, d -> d.write((WireKey)CoreFields.cid).int64(this.cid()));
            w.writeDocument(false, d -> d.write(() -> "heartbeat").text((CharSequence)""));
        };
        Runnable task = () -> {
            if (this.nc().wireOutPublisher().isEmpty()) {
                this.nc().wireOutPublisher().publish(heartbeatMessage);
            }
        };
        HEARTBEAT_EXECUTOR.scheduleAtFixedRate(task, this.heartbeatIntervalMs, this.heartbeatIntervalMs, TimeUnit.MILLISECONDS);
    }

    private static WriteMarshallable heartbeatHandler(long heartbeatTimeoutMs, long heartbeatIntervalMs, long cid) {
        return w -> w.writeDocument(true, d -> d.writeEventName((WireKey)CoreFields.csp).text((CharSequence)"/").writeEventName((WireKey)CoreFields.cid).int64(cid).writeEventName((WireKey)CoreFields.handler).typedMarshallable(new HeartbeatHandler(heartbeatTimeoutMs, heartbeatIntervalMs)));
    }

    public void writeMarshallable(@NotNull WireOut w) {
        w.write(() -> "heartbeatTimeoutMs").int64(this.heartbeatTimeoutMs);
        assert (this.heartbeatIntervalMs > 0L);
        w.write(() -> "heartbeatIntervalMs").int64(this.heartbeatIntervalMs);
    }

    @Override
    public void processData(@NotNull WireIn inWire, @NotNull WireOut outWire) {
        inWire.read(() -> "heartbeat").text();
    }

    @Override
    public void close() {
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.lastTimeMessageReceived = Long.MAX_VALUE;
        Closeable.closeQuietly((Object)this.closable());
    }

    @Override
    public void onMessageReceived() {
        this.lastTimeMessageReceived = System.currentTimeMillis();
    }

    private void initHeartbeatCheck() {
        this.self.set(() -> {
            System.out.println("checking heartbeat");
            if (this.closed.get()) {
                return;
            }
            if (!this.hasReceivedHeartbeat()) {
                this.close();
            } else {
                HEARTBEAT_EXECUTOR.schedule(this.self.get(), this.heartbeatTimeoutMs, TimeUnit.MILLISECONDS);
            }
        });
    }

    private void startHeartbeatCheck() {
        this.initHeartbeatCheck();
        if (this.hasHeartbeat.getAndSet(true)) {
            return;
        }
        this.lastTimeMessageReceived = Long.MAX_VALUE;
        HEARTBEAT_EXECUTOR.schedule(this.self.get(), this.heartbeatTimeoutMs, TimeUnit.MILLISECONDS);
    }

    private boolean hasReceivedHeartbeat() {
        System.out.println("lastTimeMessageReceived=" + this.lastTimeMessageReceived + ", System" + ".currentTimeMillis()=" + System.currentTimeMillis() + ",heartbeatTimeoutMs=" + this.heartbeatTimeoutMs);
        return this.lastTimeMessageReceived > System.currentTimeMillis() - this.heartbeatTimeoutMs;
    }

    public static class Factory
    implements Function<ClusterContext, WriteMarshallable>,
    Demarshallable {
        @UsedViaReflection
        private Factory(WireIn w) {
        }

        @Override
        public WriteMarshallable apply(ClusterContext clusterContext) {
            long heartbeatTimeoutMs = clusterContext.heartbeatTimeoutMs();
            long heartbeatIntervalMs = clusterContext.heartbeatIntervalMs();
            return HeartbeatHandler.heartbeatHandler(heartbeatTimeoutMs, heartbeatIntervalMs, HeartbeatHandler.class.hashCode());
        }
    }
}

