/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.protocols.raft.service;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import io.atomix.cluster.NodeId;
import io.atomix.primitive.PrimitiveId;
import io.atomix.primitive.PrimitiveType;
import io.atomix.primitive.operation.OperationType;
import io.atomix.primitive.operation.PrimitiveOperation;
import io.atomix.primitive.service.Commit;
import io.atomix.primitive.service.PrimitiveService;
import io.atomix.primitive.service.ServiceContext;
import io.atomix.primitive.service.impl.DefaultCommit;
import io.atomix.primitive.session.Session;
import io.atomix.primitive.session.SessionId;
import io.atomix.primitive.session.SessionListener;
import io.atomix.primitive.session.Sessions;
import io.atomix.protocols.raft.RaftException;
import io.atomix.protocols.raft.ReadConsistency;
import io.atomix.protocols.raft.impl.OperationResult;
import io.atomix.protocols.raft.impl.RaftContext;
import io.atomix.protocols.raft.service.RaftSessions;
import io.atomix.protocols.raft.session.RaftSession;
import io.atomix.protocols.raft.storage.snapshot.Snapshot;
import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import io.atomix.protocols.raft.utils.LoadMonitor;
import io.atomix.storage.buffer.BufferInput;
import io.atomix.storage.buffer.BufferOutput;
import io.atomix.utils.concurrent.ComposableFuture;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.concurrent.ThreadContextFactory;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
import io.atomix.utils.time.LogicalClock;
import io.atomix.utils.time.LogicalTimestamp;
import io.atomix.utils.time.WallClock;
import io.atomix.utils.time.WallClockTimestamp;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.BiConsumer;
import org.slf4j.Logger;

public class RaftServiceContext
implements ServiceContext {
    private static final int LOAD_WINDOW_SIZE = 5;
    private static final int HIGH_LOAD_THRESHOLD = 50;
    private final Logger log;
    private final PrimitiveId primitiveId;
    private final String serviceName;
    private final PrimitiveType primitiveType;
    private final PrimitiveService service;
    private final RaftContext raft;
    private final RaftSessions sessions;
    private final ThreadContext serviceExecutor;
    private final ThreadContext snapshotExecutor;
    private final ThreadContextFactory threadContextFactory;
    private final LoadMonitor loadMonitor;
    private final Map<Long, PendingSnapshot> pendingSnapshots = new ConcurrentSkipListMap<Long, PendingSnapshot>();
    private long snapshotIndex;
    private long currentIndex;
    private Session currentSession;
    private long currentTimestamp;
    private OperationType currentOperation;
    private final LogicalClock logicalClock = new LogicalClock(){

        public LogicalTimestamp getTime() {
            return new LogicalTimestamp(RaftServiceContext.this.currentIndex);
        }
    };
    private final WallClock wallClock = new WallClock(){

        public WallClockTimestamp getTime() {
            return new WallClockTimestamp(RaftServiceContext.this.currentTimestamp);
        }
    };

    public RaftServiceContext(PrimitiveId primitiveId, String serviceName, PrimitiveType primitiveType, PrimitiveService service, RaftContext raft, ThreadContextFactory threadContextFactory) {
        this.primitiveId = (PrimitiveId)Preconditions.checkNotNull((Object)primitiveId);
        this.serviceName = (String)Preconditions.checkNotNull((Object)serviceName);
        this.primitiveType = (PrimitiveType)Preconditions.checkNotNull((Object)primitiveType);
        this.service = (PrimitiveService)Preconditions.checkNotNull((Object)service);
        this.raft = (RaftContext)Preconditions.checkNotNull((Object)raft);
        this.sessions = new RaftSessions(primitiveId, raft.getSessions());
        this.serviceExecutor = threadContextFactory.createContext();
        this.snapshotExecutor = threadContextFactory.createContext();
        this.loadMonitor = new LoadMonitor(5, 50, this.serviceExecutor);
        this.threadContextFactory = threadContextFactory;
        this.log = ContextualLoggerFactory.getLogger(this.getClass(), (LoggerContext)LoggerContext.builder(PrimitiveService.class).addValue((Object)primitiveId).add("type", (Object)primitiveType).add("name", (Object)serviceName).build());
        this.init();
    }

    private void init() {
        this.sessions.addListener((SessionListener)this.service);
        this.service.init((ServiceContext)this);
    }

    public PrimitiveId serviceId() {
        return this.primitiveId;
    }

    public String serviceName() {
        return this.serviceName;
    }

    public PrimitiveType serviceType() {
        return this.primitiveType;
    }

    public long currentIndex() {
        return this.currentIndex;
    }

    public Session currentSession() {
        return this.currentSession;
    }

    public OperationType currentOperation() {
        return this.currentOperation;
    }

    public LogicalClock logicalClock() {
        return this.logicalClock;
    }

    public WallClock wallClock() {
        return this.wallClock;
    }

    public Sessions sessions() {
        return this.sessions;
    }

    public boolean isUnderHighLoad() {
        return this.loadMonitor.isUnderHighLoad();
    }

    public ThreadContext executor() {
        return this.serviceExecutor;
    }

    private void setOperation(OperationType operation) {
        this.currentOperation = operation;
    }

    private void tick(long index, long timestamp) {
        this.currentIndex = index;
        this.currentTimestamp = Math.max(this.currentTimestamp, timestamp);
        this.setOperation(OperationType.COMMAND);
        this.service.tick(WallClockTimestamp.from((long)timestamp));
    }

    private void expireSessions(long timestamp) {
        for (RaftSession session : this.sessions.getSessions()) {
            if (!session.isTimedOut(timestamp)) continue;
            this.log.debug("Session expired in {} milliseconds: {}", (Object)(timestamp - session.getLastUpdated()), (Object)session);
            this.sessions.expireSession(session);
        }
    }

    private void maybeCompleteSnapshot(long index) {
        if (!this.pendingSnapshots.isEmpty()) {
            long lastCompleted = index;
            for (RaftSession session : this.sessions.getSessions()) {
                lastCompleted = Math.min(lastCompleted, session.getLastCompleted());
            }
            for (PendingSnapshot pendingSnapshot : this.pendingSnapshots.values()) {
                Snapshot snapshot = pendingSnapshot.snapshot;
                if (!snapshot.isPersisted() || lastCompleted < snapshot.index()) continue;
                this.log.debug("Completing snapshot {}", (Object)snapshot.index());
                snapshot.complete();
                this.snapshotIndex = snapshot.index();
                pendingSnapshot.future.complete(this.snapshotIndex);
            }
        }
    }

    private void maybeInstallSnapshot(long index) {
        Snapshot snapshot = this.raft.getSnapshotStore().getSnapshotById(this.primitiveId);
        if (snapshot != null && snapshot.index() > this.snapshotIndex && snapshot.index() < index) {
            this.log.debug("Installing snapshot {}", (Object)snapshot.index());
            try (SnapshotReader reader = snapshot.openReader();){
                reader.skip(8);
                PrimitiveType primitiveType = this.raft.getPrimitiveTypes().get(reader.readString());
                String serviceName = reader.readString();
                int sessionCount = reader.readInt();
                for (int i = 0; i < sessionCount; ++i) {
                    SessionId sessionId = SessionId.from((long)reader.readLong());
                    NodeId node = NodeId.from((String)reader.readString());
                    ReadConsistency readConsistency = ReadConsistency.valueOf(reader.readString());
                    long minTimeout = reader.readLong();
                    long maxTimeout = reader.readLong();
                    long sessionTimestamp = reader.readLong();
                    RaftSession session = this.raft.getSessions().addSession(new RaftSession(sessionId, node, serviceName, primitiveType, readConsistency, minTimeout, maxTimeout, sessionTimestamp, this, this.raft, this.threadContextFactory));
                    session.setRequestSequence(reader.readLong());
                    session.setCommandSequence(reader.readLong());
                    session.setEventIndex(reader.readLong());
                    session.setLastCompleted(reader.readLong());
                    session.setLastApplied(snapshot.index());
                    session.setLastUpdated(sessionTimestamp);
                    this.sessions.openSession(session);
                }
                this.currentIndex = snapshot.index();
                this.currentTimestamp = snapshot.timestamp().unixTimestamp();
                this.service.restore((BufferInput)reader);
            }
            catch (Exception e) {
                this.log.error("Snapshot installation failed: {}", (Throwable)e);
            }
            this.snapshotIndex = snapshot.index();
        }
    }

    public CompletableFuture<Long> takeSnapshot(long index) {
        ComposableFuture future = new ComposableFuture();
        this.serviceExecutor.execute(() -> {
            if (this.currentIndex == 0L) {
                future.complete((Object)this.currentIndex);
                return;
            }
            long snapshotIndex = Math.max(index, this.currentIndex);
            Snapshot existingSnapshot = this.raft.getSnapshotStore().getSnapshot(this.primitiveId, snapshotIndex);
            if (existingSnapshot != null) {
                future.complete((Object)snapshotIndex);
                return;
            }
            Snapshot currentSnapshot = this.raft.getSnapshotStore().getSnapshotById(this.primitiveId);
            if (currentSnapshot != null && currentSnapshot.index() >= index) {
                future.complete((Object)snapshotIndex);
                return;
            }
            Snapshot snapshot = this.raft.getSnapshotStore().newTemporarySnapshot(this.primitiveId, this.serviceName, snapshotIndex, WallClockTimestamp.from((long)this.currentTimestamp));
            PendingSnapshot pendingSnapshot = new PendingSnapshot(snapshot);
            PendingSnapshot existingPendingSnapshot = this.pendingSnapshots.putIfAbsent(snapshotIndex, pendingSnapshot);
            if (existingPendingSnapshot != null) {
                existingPendingSnapshot.future.whenComplete((BiConsumer)future);
                return;
            }
            pendingSnapshot.future.whenComplete((r, e) -> this.pendingSnapshots.remove(snapshotIndex));
            this.log.debug("Taking snapshot {}", (Object)snapshotIndex);
            try (SnapshotWriter writer = snapshot.openWriter();){
                writer.writeLong((Long)this.primitiveId.id());
                writer.writeString(this.primitiveType.id());
                writer.writeString(this.serviceName);
                writer.writeInt(this.sessions.getSessions().size());
                for (RaftSession session : this.sessions.getSessions()) {
                    writer.writeLong((Long)session.sessionId().id());
                    writer.writeString((String)((Object)session.nodeId().id()));
                    writer.writeString(session.readConsistency().name());
                    writer.writeLong(session.minTimeout());
                    writer.writeLong(session.maxTimeout());
                    writer.writeLong(session.getLastUpdated());
                    writer.writeLong(session.getRequestSequence());
                    writer.writeLong(session.getCommandSequence());
                    writer.writeLong(session.getEventIndex());
                    writer.writeLong(session.getLastCompleted());
                }
                this.service.backup((BufferOutput)writer);
            }
            catch (Exception e2) {
                this.log.error("Snapshot failed: {}", (Throwable)e2);
            }
            this.snapshotExecutor.execute(() -> {
                pendingSnapshot.persist();
                future.complete((Object)snapshotIndex);
            });
        });
        return future;
    }

    public CompletableFuture<Void> completeSnapshot(long index) {
        PendingSnapshot pendingSnapshot = this.pendingSnapshots.get(index);
        if (pendingSnapshot == null) {
            return CompletableFuture.completedFuture(null);
        }
        this.serviceExecutor.execute(() -> this.maybeCompleteSnapshot(index));
        return pendingSnapshot.future.thenApply(v -> null);
    }

    public CompletableFuture<Long> openSession(long index, long timestamp, RaftSession session) {
        CompletableFuture<Long> future = new CompletableFuture<Long>();
        this.serviceExecutor.execute(() -> {
            this.log.debug("Opening session {}", (Object)session.sessionId());
            this.tick(index, timestamp);
            this.maybeInstallSnapshot(index);
            this.tick(index, timestamp);
            this.expireSessions(this.currentTimestamp);
            this.sessions.openSession(session);
            this.commit();
            this.maybeCompleteSnapshot(index);
            future.complete((Long)session.sessionId().id());
        });
        return future;
    }

    public CompletableFuture<Boolean> keepAlive(long index, long timestamp, RaftSession session, long commandSequence, long eventIndex) {
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        this.serviceExecutor.execute(() -> {
            this.maybeInstallSnapshot(index);
            this.tick(index, timestamp);
            if (session.getState() != Session.State.CLOSED) {
                session.setLastUpdated(timestamp);
                session.clearResults(commandSequence);
                session.resendEvents(eventIndex);
                session.resetRequestSequence(commandSequence);
                session.setCommandSequence(commandSequence);
                future.complete(true);
            } else {
                future.complete(false);
            }
        });
        return future;
    }

    public CompletableFuture<Void> completeKeepAlive(long index, long timestamp) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.serviceExecutor.execute(() -> {
            this.tick(index, timestamp);
            this.expireSessions(this.currentTimestamp);
            this.commit();
            this.maybeCompleteSnapshot(index);
            future.complete(null);
        });
        return future;
    }

    public CompletableFuture<Void> keepAliveSessions(long index, long timestamp) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.serviceExecutor.execute(() -> {
            this.log.debug("Resetting session timeouts");
            this.currentIndex = index;
            this.currentTimestamp = Math.max(this.currentTimestamp, timestamp);
            for (RaftSession session : this.sessions.getSessions()) {
                session.setLastUpdated(timestamp);
            }
        });
        return future;
    }

    public CompletableFuture<Void> closeSession(long index, long timestamp, RaftSession session, boolean expired) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.serviceExecutor.execute(() -> {
            this.log.debug("Closing session {}", (Object)session.sessionId());
            session.setLastUpdated(timestamp);
            this.maybeInstallSnapshot(index);
            this.tick(index, timestamp);
            this.expireSessions(this.currentTimestamp);
            if (expired) {
                this.sessions.expireSession(session);
            } else {
                this.sessions.closeSession(session);
            }
            this.commit();
            this.maybeCompleteSnapshot(index);
            future.complete(null);
        });
        return future;
    }

    public CompletableFuture<OperationResult> executeCommand(long index, long sequence, long timestamp, RaftSession session, PrimitiveOperation operation) {
        CompletableFuture<OperationResult> future = new CompletableFuture<OperationResult>();
        this.serviceExecutor.execute(() -> this.executeCommand(index, sequence, timestamp, session, operation, future));
        return future;
    }

    private void executeCommand(long index, long sequence, long timestamp, RaftSession session, PrimitiveOperation operation, CompletableFuture<OperationResult> future) {
        session.setLastUpdated(timestamp);
        this.maybeInstallSnapshot(index);
        this.tick(index, timestamp);
        if (!session.getState().active()) {
            this.log.warn("Session not open: {}", (Object)session);
            future.completeExceptionally(new RaftException.UnknownSession("Unknown session: " + session.sessionId(), new Object[0]));
            return;
        }
        if (sequence > 0L && sequence < session.nextCommandSequence()) {
            this.log.trace("Returning cached result for command with sequence number {} < {}", (Object)sequence, (Object)session.nextCommandSequence());
            this.sequenceCommand(index, sequence, session, future);
        } else {
            this.applyCommand(index, sequence, timestamp, operation, session, future);
            session.setCommandSequence(sequence);
        }
    }

    private void sequenceCommand(long index, long sequence, RaftSession session, CompletableFuture<OperationResult> future) {
        OperationResult result = session.getResult(sequence);
        if (result == null) {
            this.log.debug("Missing command result at index {}", (Object)index);
        }
        future.complete(result);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void applyCommand(long index, long sequence, long timestamp, PrimitiveOperation operation, RaftSession session, CompletableFuture<OperationResult> future) {
        OperationResult result;
        DefaultCommit commit = new DefaultCommit(index, operation.id(), (Object)operation.value(), (Session)session, timestamp);
        long eventIndex = session.getEventIndex();
        try {
            this.currentSession = session;
            byte[] output = this.service.apply((Commit)commit);
            result = OperationResult.succeeded(index, eventIndex, output);
        }
        catch (Exception e) {
            result = OperationResult.failed(index, eventIndex, e);
        }
        finally {
            this.currentSession = null;
        }
        this.commit();
        session.registerResult(sequence, result);
        future.complete(result);
    }

    public CompletableFuture<OperationResult> executeQuery(long index, long sequence, long timestamp, RaftSession session, PrimitiveOperation operation) {
        CompletableFuture<OperationResult> future = new CompletableFuture<OperationResult>();
        this.serviceExecutor.execute(() -> this.executeQuery(index, sequence, timestamp, session, operation, future));
        return future;
    }

    private void executeQuery(long index, long sequence, long timestamp, RaftSession session, PrimitiveOperation operation, CompletableFuture<OperationResult> future) {
        if (!session.getState().active()) {
            this.log.warn("Inactive session: " + session.sessionId());
            future.completeExceptionally(new RaftException.UnknownSession("Unknown session: " + session.sessionId(), new Object[0]));
            return;
        }
        this.sequenceQuery(index, sequence, timestamp, session, operation, future);
    }

    private void sequenceQuery(long index, long sequence, long timestamp, RaftSession session, PrimitiveOperation operation, CompletableFuture<OperationResult> future) {
        long commandSequence = session.getCommandSequence();
        if (sequence > commandSequence) {
            this.log.trace("Registering query with sequence number " + sequence + " > " + commandSequence);
            session.registerSequenceQuery(sequence, () -> this.indexQuery(index, timestamp, session, operation, future));
        } else {
            this.indexQuery(index, timestamp, session, operation, future);
        }
    }

    private void indexQuery(long index, long timestamp, RaftSession session, PrimitiveOperation operation, CompletableFuture<OperationResult> future) {
        if (index > this.currentIndex) {
            this.log.trace("Registering query with index " + index + " > " + this.currentIndex);
            session.registerIndexQuery(index, () -> this.applyQuery(timestamp, session, operation, future));
        } else {
            this.applyQuery(timestamp, session, operation, future);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void applyQuery(long timestamp, RaftSession session, PrimitiveOperation operation, CompletableFuture<OperationResult> future) {
        OperationResult result;
        if (!session.getState().active()) {
            this.log.warn("Inactive session: " + session.sessionId());
            future.completeExceptionally(new RaftException.UnknownSession("Unknown session: " + session.sessionId(), new Object[0]));
            return;
        }
        this.setOperation(OperationType.QUERY);
        DefaultCommit commit = new DefaultCommit(session.getLastApplied(), operation.id(), (Object)operation.value(), (Session)session, timestamp);
        long eventIndex = session.getEventIndex();
        try {
            this.currentSession = session;
            result = OperationResult.succeeded(this.currentIndex, eventIndex, this.service.apply((Commit)commit));
        }
        catch (Exception e) {
            result = OperationResult.failed(this.currentIndex, eventIndex, e);
        }
        finally {
            this.currentSession = null;
        }
        future.complete(result);
    }

    private void commit() {
        long index = this.currentIndex;
        for (RaftSession session : this.sessions.getSessions()) {
            session.commit(index);
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("server", (Object)this.raft.getName()).add("type", (Object)this.primitiveType).add("name", (Object)this.serviceName).add("id", (Object)this.primitiveId).toString();
    }

    private static class PendingSnapshot {
        private volatile Snapshot snapshot;
        private final CompletableFuture<Long> future = new CompletableFuture();

        public PendingSnapshot(Snapshot snapshot) {
            this.snapshot = snapshot;
        }

        void persist() {
            this.snapshot = this.snapshot.persist();
        }
    }
}

