/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.protocols.raft.impl;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import io.atomix.protocols.raft.RaftException;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.ReadConsistency;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.impl.MetadataResult;
import io.atomix.protocols.raft.impl.OperationResult;
import io.atomix.protocols.raft.impl.RaftContext;
import io.atomix.protocols.raft.service.RaftService;
import io.atomix.protocols.raft.service.ServiceId;
import io.atomix.protocols.raft.service.ServiceType;
import io.atomix.protocols.raft.service.impl.DefaultServiceContext;
import io.atomix.protocols.raft.session.RaftSessionMetadata;
import io.atomix.protocols.raft.session.SessionId;
import io.atomix.protocols.raft.session.impl.RaftSessionContext;
import io.atomix.protocols.raft.session.impl.RaftSessionManager;
import io.atomix.protocols.raft.storage.log.RaftLog;
import io.atomix.protocols.raft.storage.log.RaftLogReader;
import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
import io.atomix.protocols.raft.storage.log.entry.RaftLogEntry;
import io.atomix.protocols.raft.storage.snapshot.Snapshot;
import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
import io.atomix.storage.journal.Indexed;
import io.atomix.utils.concurrent.ComposableFuture;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;

public class RaftServiceManager
implements AutoCloseable {
    private static final Duration SNAPSHOT_INTERVAL = Duration.ofSeconds(10L);
    private static final Duration MIN_COMPACT_INTERVAL = Duration.ofSeconds(10L);
    private final Logger logger;
    private final RaftContext raft;
    private final ScheduledExecutorService threadPool;
    private final ThreadContext threadContext;
    private final RaftLog log;
    private final RaftLogReader reader;
    private final RaftSessionManager sessionManager = new RaftSessionManager();
    private final Map<String, DefaultServiceContext> services = new HashMap<String, DefaultServiceContext>();
    private final Random random = new Random();
    private long lastPrepared;
    private long lastCompacted;

    public RaftServiceManager(RaftContext raft, ScheduledExecutorService threadPool, ThreadContext threadContext) {
        this.raft = (RaftContext)Preconditions.checkNotNull((Object)raft, (Object)"state cannot be null");
        this.log = raft.getLog();
        this.reader = this.log.openReader(1L, RaftLogReader.Mode.COMMITS);
        this.threadPool = threadPool;
        this.threadContext = threadContext;
        this.logger = ContextualLoggerFactory.getLogger(this.getClass(), (LoggerContext)LoggerContext.builder(RaftServer.class).addValue((Object)raft.getName()).build());
        this.scheduleSnapshots();
    }

    public RaftSessionManager getSessions() {
        return this.sessionManager;
    }

    public void applyAll(long index) {
        if (index > this.raft.getLastApplied()) {
            this.raft.getThreadContext().execute(() -> this.apply(index));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T> CompletableFuture<T> apply(long index) {
        while (this.reader.hasNext()) {
            Indexed entry;
            long lastApplied;
            long nextIndex = this.reader.getNextIndex();
            if (nextIndex > (lastApplied = this.raft.getLastApplied()) + 1L && nextIndex != this.reader.getFirstIndex()) {
                this.logger.error("Cannot apply non-sequential index {} unless it's the first entry in the log: {}", (Object)nextIndex, (Object)this.reader.getFirstIndex());
                return Futures.exceptionalFuture((Throwable)new IndexOutOfBoundsException("Cannot apply non-sequential index unless it's the first entry in the log"));
            }
            if (nextIndex < lastApplied) {
                this.logger.error("Cannot apply duplicate entry at index {}", (Object)nextIndex);
                return Futures.exceptionalFuture((Throwable)new IndexOutOfBoundsException("Cannot apply duplicate entry at index " + nextIndex));
            }
            if (nextIndex < index) {
                entry = this.reader.next();
                try {
                    this.apply((Indexed<? extends RaftLogEntry>)entry);
                    continue;
                }
                catch (Exception e) {
                    this.logger.error("Failed to apply {}: {}", (Object)entry, (Object)e);
                    continue;
                }
                finally {
                    this.raft.setLastApplied(nextIndex);
                    continue;
                }
            }
            if (nextIndex == index) {
                entry = this.reader.next();
                try {
                    if (entry.index() != index) {
                        throw new IllegalStateException("inconsistent index applying entry " + index + ": " + entry);
                    }
                    CompletableFuture<T> e = this.apply((Indexed<? extends RaftLogEntry>)entry);
                    return e;
                }
                catch (Exception e) {
                    this.logger.error("Failed to apply {}: {}", (Object)entry, (Object)e);
                    continue;
                }
                finally {
                    this.raft.setLastApplied(nextIndex);
                    continue;
                }
            }
            this.raft.setLastApplied(nextIndex);
            return Futures.completedFuture(null);
        }
        this.logger.error("Cannot commit index " + index);
        return Futures.exceptionalFuture((Throwable)new IndexOutOfBoundsException("Cannot commit index " + index));
    }

    public <T> CompletableFuture<T> apply(Indexed<? extends RaftLogEntry> entry) {
        this.logger.trace("Applying {}", entry);
        if (entry.type() == QueryEntry.class) {
            return this.applyQuery((Indexed<QueryEntry>)entry.cast());
        }
        this.prepareIndex(entry.index());
        if (entry.type() == CommandEntry.class) {
            return this.applyCommand((Indexed<CommandEntry>)entry.cast());
        }
        if (entry.type() == OpenSessionEntry.class) {
            return this.applyOpenSession((Indexed<OpenSessionEntry>)entry.cast());
        }
        if (entry.type() == KeepAliveEntry.class) {
            return this.applyKeepAlive((Indexed<KeepAliveEntry>)entry.cast());
        }
        if (entry.type() == CloseSessionEntry.class) {
            return this.applyCloseSession((Indexed<CloseSessionEntry>)entry.cast());
        }
        if (entry.type() == MetadataEntry.class) {
            return this.applyMetadata((Indexed<MetadataEntry>)entry.cast());
        }
        if (entry.type() == InitializeEntry.class) {
            return this.applyInitialize((Indexed<InitializeEntry>)entry.cast());
        }
        if (entry.type() == ConfigurationEntry.class) {
            return this.applyConfiguration((Indexed<ConfigurationEntry>)entry.cast());
        }
        return Futures.exceptionalFuture((Throwable)new RaftException.ProtocolException("Unknown entry type", new Object[0]));
    }

    private void prepareIndex(long index) {
        Snapshot snapshot;
        if (index > this.lastPrepared && (snapshot = this.raft.getSnapshotStore().getSnapshotByIndex(index)) != null) {
            try (SnapshotReader reader = snapshot.openReader();){
                ServiceId serviceId = ServiceId.from(reader.readLong());
                ServiceType serviceType = ServiceType.from(reader.readString());
                String serviceName = reader.readString();
                DefaultServiceContext service = this.getOrInitializeService(serviceId, serviceType, serviceName);
                if (service == null) {
                    return;
                }
                this.logger.debug("Restoring sessions for {}", (Object)serviceName);
                int sessionCount = reader.readInt();
                for (int i = 0; i < sessionCount; ++i) {
                    SessionId sessionId = SessionId.from(reader.readLong());
                    MemberId node = MemberId.from(reader.readString());
                    ReadConsistency readConsistency = ReadConsistency.valueOf(reader.readString());
                    long sessionTimeout = reader.readLong();
                    long sessionTimestamp = reader.readLong();
                    RaftSessionContext session = new RaftSessionContext(sessionId, node, serviceName, serviceType, readConsistency, sessionTimeout, service, this.raft, this.threadPool);
                    session.setTimestamp(sessionTimestamp);
                    session.setRequestSequence(reader.readLong());
                    session.setCommandSequence(reader.readLong());
                    session.setEventIndex(reader.readLong());
                    session.setLastCompleted(reader.readLong());
                    session.setLastApplied(snapshot.index());
                    this.sessionManager.registerSession(session);
                }
            }
            this.lastPrepared = index;
        }
    }

    private CompletableFuture<Void> applyInitialize(Indexed<InitializeEntry> entry) {
        for (DefaultServiceContext service : this.services.values()) {
            service.keepAliveSessions(entry.index(), ((InitializeEntry)entry.entry()).timestamp());
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> applyConfiguration(Indexed<ConfigurationEntry> entry) {
        for (DefaultServiceContext service : this.services.values()) {
            service.keepAliveSessions(entry.index(), ((ConfigurationEntry)entry.entry()).timestamp());
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<long[]> applyKeepAlive(Indexed<KeepAliveEntry> entry) {
        long[] sessionIds = ((KeepAliveEntry)entry.entry()).sessionIds();
        long[] commandSequences = ((KeepAliveEntry)entry.entry()).commandSequenceNumbers();
        long[] eventIndexes = ((KeepAliveEntry)entry.entry()).eventIndexes();
        ArrayList successfulSessionIds = new ArrayList(sessionIds.length);
        ArrayList<CompletionStage> futures = new ArrayList<CompletionStage>(sessionIds.length);
        for (int i = 0; i < sessionIds.length; ++i) {
            long sessionId = sessionIds[i];
            long commandSequence = commandSequences[i];
            long eventIndex = eventIndexes[i];
            RaftSessionContext session = this.sessionManager.getSession(sessionId);
            if (session == null) continue;
            CompletionStage future = session.getService().keepAlive(entry.index(), ((KeepAliveEntry)entry.entry()).timestamp(), session, commandSequence, eventIndex).thenApply(succeeded -> {
                if (succeeded.booleanValue()) {
                    List list = successfulSessionIds;
                    synchronized (list) {
                        successfulSessionIds.add(sessionId);
                    }
                }
                return null;
            });
            futures.add(future);
        }
        for (DefaultServiceContext service : this.services.values()) {
            service.completeKeepAlive(entry.index(), ((KeepAliveEntry)entry.entry()).timestamp());
        }
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(v -> {
            List list = successfulSessionIds;
            synchronized (list) {
                return Longs.toArray((Collection)successfulSessionIds);
            }
        });
    }

    private DefaultServiceContext getOrInitializeService(ServiceId serviceId, ServiceType serviceType, String serviceName) {
        DefaultServiceContext service = this.services.get(serviceName);
        if (service == null) {
            Supplier<RaftService> serviceFactory = this.raft.getServiceRegistry().getFactory((String)((Object)serviceType.id()));
            if (serviceFactory == null) {
                return null;
            }
            service = new DefaultServiceContext(serviceId, serviceName, serviceType, serviceFactory.get(), this.raft, this.sessionManager, this.threadPool);
            this.services.put(serviceName, service);
        }
        return service;
    }

    private CompletableFuture<Long> applyOpenSession(Indexed<OpenSessionEntry> entry) {
        DefaultServiceContext service = this.getOrInitializeService(ServiceId.from(entry.index()), ServiceType.from(((OpenSessionEntry)entry.entry()).serviceType()), ((OpenSessionEntry)entry.entry()).serviceName());
        if (service == null) {
            return Futures.exceptionalFuture((Throwable)new RaftException.UnknownService("Unknown service type " + ((OpenSessionEntry)entry.entry()).serviceType(), new Object[0]));
        }
        SessionId sessionId = SessionId.from(entry.index());
        RaftSessionContext session = new RaftSessionContext(sessionId, MemberId.from(((OpenSessionEntry)entry.entry()).memberId()), ((OpenSessionEntry)entry.entry()).serviceName(), ServiceType.from(((OpenSessionEntry)entry.entry()).serviceType()), ((OpenSessionEntry)entry.entry()).readConsistency(), ((OpenSessionEntry)entry.entry()).timeout(), service, this.raft, this.threadPool);
        this.sessionManager.registerSession(session);
        return service.openSession(entry.index(), ((OpenSessionEntry)entry.entry()).timestamp(), session);
    }

    private CompletableFuture<Void> applyCloseSession(Indexed<CloseSessionEntry> entry) {
        RaftSessionContext session = this.sessionManager.getSession(((CloseSessionEntry)entry.entry()).session());
        if (session == null) {
            this.logger.warn("Unknown session: " + ((CloseSessionEntry)entry.entry()).session());
            return Futures.exceptionalFuture((Throwable)new RaftException.UnknownSession("Unknown session: " + ((CloseSessionEntry)entry.entry()).session(), new Object[0]));
        }
        DefaultServiceContext service = session.getService();
        return service.closeSession(entry.index(), ((CloseSessionEntry)entry.entry()).timestamp(), session);
    }

    private CompletableFuture<MetadataResult> applyMetadata(Indexed<MetadataEntry> entry) {
        if (((MetadataEntry)entry.entry()).session() > 0L) {
            RaftSessionContext session = this.sessionManager.getSession(((MetadataEntry)entry.entry()).session());
            if (session == null) {
                this.logger.warn("Unknown session: " + ((MetadataEntry)entry.entry()).session());
                return Futures.exceptionalFuture((Throwable)new RaftException.UnknownSession("Unknown session: " + ((MetadataEntry)entry.entry()).session(), new Object[0]));
            }
            HashSet<RaftSessionMetadata> sessions = new HashSet<RaftSessionMetadata>();
            for (RaftSessionContext s : this.sessionManager.getSessions()) {
                if (!s.serviceName().equals(session.serviceName())) continue;
                sessions.add(new RaftSessionMetadata((Long)s.sessionId().id(), s.serviceName(), (String)((Object)s.serviceType().id())));
            }
            return CompletableFuture.completedFuture(new MetadataResult(sessions));
        }
        HashSet<RaftSessionMetadata> sessions = new HashSet<RaftSessionMetadata>();
        for (RaftSessionContext session : this.sessionManager.getSessions()) {
            sessions.add(new RaftSessionMetadata((Long)session.sessionId().id(), session.serviceName(), (String)((Object)session.serviceType().id())));
        }
        return CompletableFuture.completedFuture(new MetadataResult(sessions));
    }

    private CompletableFuture<OperationResult> applyCommand(Indexed<CommandEntry> entry) {
        RaftSessionContext session = this.sessionManager.getSession(((CommandEntry)entry.entry()).session());
        if (session == null) {
            this.logger.warn("Unknown session: " + ((CommandEntry)entry.entry()).session());
            return Futures.exceptionalFuture((Throwable)new RaftException.UnknownSession("unknown session: " + ((CommandEntry)entry.entry()).session(), new Object[0]));
        }
        return session.getService().executeCommand(entry.index(), ((CommandEntry)entry.entry()).sequenceNumber(), ((CommandEntry)entry.entry()).timestamp(), session, ((CommandEntry)entry.entry()).operation());
    }

    private CompletableFuture<OperationResult> applyQuery(Indexed<QueryEntry> entry) {
        RaftSessionContext session = this.sessionManager.getSession(((QueryEntry)entry.entry()).session());
        if (session == null) {
            this.logger.warn("Unknown session: " + ((QueryEntry)entry.entry()).session());
            return Futures.exceptionalFuture((Throwable)new RaftException.UnknownSession("unknown session " + ((QueryEntry)entry.entry()).session(), new Object[0]));
        }
        return session.getService().executeQuery(entry.index(), ((QueryEntry)entry.entry()).sequenceNumber(), ((QueryEntry)entry.entry()).timestamp(), session, ((QueryEntry)entry.entry()).operation());
    }

    private void scheduleSnapshots() {
        this.threadContext.schedule(SNAPSHOT_INTERVAL, this::snapshotServices);
    }

    private CompletableFuture<Void> scheduleCompletion(DefaultServiceContext serviceContext, long snapshotIndex) {
        ComposableFuture future = new ComposableFuture();
        Duration delay = SNAPSHOT_INTERVAL.plusMillis(this.random.nextInt((int)SNAPSHOT_INTERVAL.toMillis()));
        this.threadContext.schedule(delay, () -> serviceContext.completeSnapshot(snapshotIndex).whenComplete((BiConsumer)future));
        return future;
    }

    private void scheduleCompaction(long lastApplied) {
        Duration delay = MIN_COMPACT_INTERVAL.plusMillis(this.random.nextInt((int)MIN_COMPACT_INTERVAL.toMillis()));
        this.logger.trace("Scheduling compaction in {}", (Object)delay);
        this.threadContext.schedule(delay, () -> this.compactLogs(lastApplied));
    }

    private void snapshotServices() {
        long lastApplied = this.raft.getLastApplied();
        if (this.raft.getLog().isCompactable(lastApplied) && this.raft.getLog().getCompactableIndex(lastApplied) > this.lastCompacted) {
            this.logger.debug("Snapshotting services");
            this.lastCompacted = lastApplied;
            ArrayList<DefaultServiceContext> services = new ArrayList<DefaultServiceContext>(this.services.values());
            List<CompletableFuture> futures = services.stream().map(context -> {
                long snapshotIndex = context.takeSnapshot().join();
                return this.scheduleCompletion((DefaultServiceContext)context, snapshotIndex);
            }).collect(Collectors.toList());
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((result, error) -> this.scheduleCompaction(lastApplied));
        } else {
            this.scheduleSnapshots();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void compactLogs(long compactIndex) {
        this.logger.debug("Compacting logs up to index {}", (Object)compactIndex);
        try {
            this.log.compact(compactIndex);
        }
        catch (Exception e) {
            this.logger.error("An exception occurred during log compaction: {}", (Throwable)e);
        }
        finally {
            this.snapshotServices();
        }
    }

    @Override
    public void close() {
    }
}

