/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.protocols.log.roles;

import com.google.common.collect.ImmutableList;
import io.atomix.cluster.MemberId;
import io.atomix.protocols.log.impl.DistributedLogServerContext;
import io.atomix.protocols.log.protocol.BackupOperation;
import io.atomix.protocols.log.protocol.BackupRequest;
import io.atomix.protocols.log.roles.Replicator;
import io.atomix.utils.concurrent.Scheduled;
import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;

class AsynchronousReplicator
implements Replicator {
    private static final int MAX_BATCH_SIZE = 100;
    private static final long MAX_BATCH_TIME = 100L;
    private final DistributedLogServerContext context;
    private final Logger log;
    private final Map<MemberId, BackupQueue> queues = new HashMap<MemberId, BackupQueue>();

    AsynchronousReplicator(DistributedLogServerContext context, Logger log) {
        this.context = context;
        this.log = log;
    }

    @Override
    public CompletableFuture<Void> replicate(BackupOperation operation) {
        for (MemberId backup : this.context.followers()) {
            this.queues.computeIfAbsent(backup, x$0 -> new BackupQueue((MemberId)x$0)).add(operation);
        }
        this.context.setCommitIndex(operation.index());
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public void close() {
        this.queues.values().forEach(BackupQueue::close);
    }

    private final class BackupQueue {
        private final Queue<BackupOperation> operations = new LinkedList<BackupOperation>();
        private final MemberId memberId;
        private final Scheduled backupTimer;
        private long lastSent;

        BackupQueue(MemberId memberId) {
            this.memberId = memberId;
            this.backupTimer = AsynchronousReplicator.this.context.threadContext().schedule(Duration.ofMillis(50L), Duration.ofMillis(50L), this::maybeBackup);
        }

        void add(BackupOperation operation) {
            this.operations.add(operation);
            if (this.operations.size() >= 100) {
                this.backup();
            }
        }

        private void maybeBackup() {
            if (System.currentTimeMillis() - this.lastSent > 100L && !this.operations.isEmpty()) {
                this.backup();
            }
        }

        private void backup() {
            ImmutableList batch = ImmutableList.copyOf(this.operations);
            this.operations.clear();
            BackupRequest request = BackupRequest.request(AsynchronousReplicator.this.context.memberId(), AsynchronousReplicator.this.context.currentTerm(), AsynchronousReplicator.this.context.getCommitIndex(), (List<BackupOperation>)batch);
            AsynchronousReplicator.this.log.trace("Sending {} to {}", (Object)request, (Object)this.memberId);
            AsynchronousReplicator.this.context.protocol().backup(this.memberId, request);
            this.lastSent = System.currentTimeMillis();
        }

        void close() {
            this.backupTimer.cancel();
        }
    }
}

