/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.junit.Assert;
import org.junit.Test;

public class TaskAssignorConvergenceTest {
    @Test
    public void staticAssignmentShouldConvergeWithTheFirstAssignment() {
        AssignorConfiguration.AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs(Long.valueOf(100L), Integer.valueOf(2), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
        Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1);
        TaskAssignorConvergenceTest.testForConvergence(harness, configs, 1);
        TaskAssignorConvergenceTest.verifyValidAssignment(0, harness);
        TaskAssignorConvergenceTest.verifyBalancedAssignment(harness);
    }

    @Test
    public void assignmentShouldConvergeAfterAddingNode() {
        int numStatelessTasks = 7;
        int numStatefulTasks = 11;
        int maxWarmupReplicas = 2;
        boolean numStandbyReplicas = false;
        AssignorConfiguration.AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs(Long.valueOf(100L), Integer.valueOf(2), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
        Harness harness = Harness.initializeCluster(7, 11, 1, () -> 5);
        TaskAssignorConvergenceTest.testForConvergence(harness, configs, 1);
        harness.addNode();
        TaskAssignorConvergenceTest.testForConvergence(harness, configs, 6);
        TaskAssignorConvergenceTest.verifyValidAssignment(0, harness);
        TaskAssignorConvergenceTest.verifyBalancedAssignment(harness);
    }

    @Test
    public void droppingNodesShouldConverge() {
        int numStatelessTasks = 11;
        int numStatefulTasks = 13;
        int maxWarmupReplicas = 2;
        boolean numStandbyReplicas = false;
        AssignorConfiguration.AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs(Long.valueOf(100L), Integer.valueOf(2), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
        Harness harness = Harness.initializeCluster(11, 13, 7, () -> 5);
        TaskAssignorConvergenceTest.testForConvergence(harness, configs, 1);
        harness.dropNode();
        TaskAssignorConvergenceTest.testForConvergence(harness, configs, 8);
        TaskAssignorConvergenceTest.verifyValidAssignment(0, harness);
        TaskAssignorConvergenceTest.verifyBalancedAssignment(harness);
    }

    @Test
    public void randomClusterPerturbationsShouldConverge() {
        long deadline = System.currentTimeMillis() + 10000L;
        do {
            long seed = new Random().nextLong();
            TaskAssignorConvergenceTest.runRandomizedScenario(seed);
        } while (System.currentTimeMillis() < deadline);
    }

    private static void runRandomizedScenario(long seed) {
        Harness harness = null;
        try {
            Random prng = new Random(seed);
            int initialClusterSize = prng.nextInt(10) + 1;
            int numStatelessTasks = prng.nextInt(10) + 1;
            int numStatefulTasks = prng.nextInt(10) + 1;
            int maxWarmupReplicas = prng.nextInt(numStatefulTasks) + 1;
            int numStandbyReplicas = prng.nextInt(initialClusterSize + 1);
            int numberOfEvents = prng.nextInt(10) + 1;
            AssignorConfiguration.AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs(Long.valueOf(100L), Integer.valueOf(maxWarmupReplicas), Integer.valueOf(numStandbyReplicas), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
            harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, initialClusterSize, () -> prng.nextInt(10) + 1);
            TaskAssignorConvergenceTest.testForConvergence(harness, configs, 1);
            TaskAssignorConvergenceTest.verifyValidAssignment(numStandbyReplicas, harness);
            TaskAssignorConvergenceTest.verifyBalancedAssignment(harness);
            for (int i = 0; i < numberOfEvents; ++i) {
                int event = prng.nextInt(2);
                switch (event) {
                    case 0: {
                        harness.dropRandomNodes(prng.nextInt(initialClusterSize), prng);
                        break;
                    }
                    case 1: {
                        harness.addOrResurrectNodesRandomly(prng, initialClusterSize);
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Unexpected event: " + event);
                    }
                }
                if (harness.clientStates.isEmpty()) continue;
                TaskAssignorConvergenceTest.testForConvergence(harness, configs, 2 * (numStatefulTasks + numStatefulTasks * numStandbyReplicas));
                TaskAssignorConvergenceTest.verifyValidAssignment(numStandbyReplicas, harness);
                TaskAssignorConvergenceTest.verifyBalancedAssignment(harness);
            }
        }
        catch (AssertionError t) {
            throw new AssertionError("Assertion failed in randomized test. Reproduce with: `runRandomizedScenario(" + seed + ")`.", (Throwable)((Object)t));
        }
        catch (Throwable t) {
            StringBuilder builder = new StringBuilder().append("Exception in randomized scenario. Reproduce with: `runRandomizedScenario(").append(seed).append(")`. ");
            if (harness != null) {
                builder.append((CharSequence)harness.history);
            }
            throw new AssertionError(builder.toString(), t);
        }
    }

    private static void verifyBalancedAssignment(Harness harness) {
        Set<TaskId> allStatefulTasks = harness.statefulTaskEndOffsetSums.keySet();
        Map clientStates = harness.clientStates;
        StringBuilder failureContext = harness.history;
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, failureContext);
        AssignmentTestUtils.assertBalancedStatefulAssignment(allStatefulTasks, clientStates, failureContext);
        AssignmentTestUtils.TaskSkewReport taskSkewReport = AssignmentTestUtils.analyzeTaskAssignmentBalance(harness.clientStates);
        if (taskSkewReport.totalSkewedTasks() > 0) {
            Assert.fail((String)("Expected a balanced task assignment, but was: " + taskSkewReport + '\n' + failureContext));
        }
    }

    private static void verifyValidAssignment(int numStandbyReplicas, Harness harness) {
        Set<TaskId> statefulTasks = harness.statefulTaskEndOffsetSums.keySet();
        Set statelessTasks = harness.statelessTasks;
        Map assignedStates = harness.clientStates;
        StringBuilder failureContext = harness.history;
        AssignmentTestUtils.assertValidAssignment(numStandbyReplicas, statefulTasks, statelessTasks, assignedStates, failureContext);
    }

    private static void testForConvergence(Harness harness, AssignorConfiguration.AssignmentConfigs configs, int iterationLimit) {
        TreeSet allTasks = new TreeSet();
        allTasks.addAll(harness.statelessTasks);
        allTasks.addAll(harness.statefulTaskEndOffsetSums.keySet());
        harness.recordConfig(configs);
        boolean rebalancePending = true;
        int iteration = 0;
        while (rebalancePending && iteration < iterationLimit) {
            harness.prepareForNextRebalance();
            harness.recordBefore(++iteration);
            rebalancePending = new HighAvailabilityTaskAssignor().assign(harness.clientStates, allTasks, harness.statefulTaskEndOffsetSums.keySet(), configs);
            harness.recordAfter(iteration, rebalancePending);
        }
        if (rebalancePending) {
            StringBuilder message = new StringBuilder().append("Rebalances have not converged after iteration cutoff: ").append(iterationLimit).append((CharSequence)harness.history);
            Assert.fail((String)message.toString());
        }
    }

    private static final class Harness {
        private final Set<TaskId> statelessTasks;
        private final Map<TaskId, Long> statefulTaskEndOffsetSums;
        private final Map<UUID, ClientState> clientStates;
        private final Map<UUID, ClientState> droppedClientStates;
        private final StringBuilder history = new StringBuilder();

        private static Harness initializeCluster(int numStatelessTasks, int numStatefulTasks, int numNodes, Supplier<Integer> partitionCountSupplier) {
            int i;
            int subtopology = 0;
            TreeSet<TaskId> statelessTasks = new TreeSet<TaskId>();
            int remainingStatelessTasks = numStatelessTasks;
            while (remainingStatelessTasks > 0) {
                int partitions = Math.min(remainingStatelessTasks, partitionCountSupplier.get());
                for (int i2 = 0; i2 < partitions; ++i2) {
                    statelessTasks.add(new TaskId(subtopology, i2));
                    --remainingStatelessTasks;
                }
                ++subtopology;
            }
            TreeMap<TaskId, Long> statefulTaskEndOffsetSums = new TreeMap<TaskId, Long>();
            int remainingStatefulTasks = numStatefulTasks;
            while (remainingStatefulTasks > 0) {
                int partitions = Math.min(remainingStatefulTasks, partitionCountSupplier.get());
                for (i = 0; i < partitions; ++i) {
                    statefulTaskEndOffsetSums.put(new TaskId(subtopology, i), 150000L);
                    --remainingStatefulTasks;
                }
                ++subtopology;
            }
            TreeMap<UUID, ClientState> clientStates = new TreeMap<UUID, ClientState>();
            for (i = 0; i < numNodes; ++i) {
                UUID uuid = AssignmentTestUtils.uuidForInt(i);
                clientStates.put(uuid, Harness.emptyInstance(uuid, statefulTaskEndOffsetSums));
            }
            return new Harness(statelessTasks, statefulTaskEndOffsetSums, clientStates);
        }

        private Harness(Set<TaskId> statelessTasks, Map<TaskId, Long> statefulTaskEndOffsetSums, Map<UUID, ClientState> clientStates) {
            this.statelessTasks = statelessTasks;
            this.statefulTaskEndOffsetSums = statefulTaskEndOffsetSums;
            this.clientStates = clientStates;
            this.droppedClientStates = new TreeMap<UUID, ClientState>();
            this.history.append('\n');
            this.history.append("Cluster and application initial state: \n");
            this.history.append("Stateless tasks: ").append(statelessTasks).append('\n');
            this.history.append("Stateful tasks:  ").append(statefulTaskEndOffsetSums.keySet()).append('\n');
            this.formatClientStates(true);
            this.history.append("History of the cluster: \n");
        }

        private void addNode() {
            UUID uuid = AssignmentTestUtils.uuidForInt(this.clientStates.size() + this.droppedClientStates.size());
            this.history.append("Adding new node ").append(uuid).append('\n');
            this.clientStates.put(uuid, Harness.emptyInstance(uuid, this.statefulTaskEndOffsetSums));
        }

        private static ClientState emptyInstance(UUID uuid, Map<TaskId, Long> allTaskEndOffsetSums) {
            ClientState clientState = new ClientState(1);
            clientState.computeTaskLags(uuid, allTaskEndOffsetSums);
            return clientState;
        }

        private void addOrResurrectNodesRandomly(Random prng, int limit) {
            int numberToAdd = prng.nextInt(limit);
            for (int i = 0; i < numberToAdd; ++i) {
                boolean addNew = prng.nextBoolean();
                if (addNew || this.droppedClientStates.isEmpty()) {
                    this.addNode();
                    continue;
                }
                UUID uuid = Harness.selectRandomElement(prng, this.droppedClientStates);
                this.history.append("Resurrecting node ").append(uuid).append('\n');
                this.clientStates.put(uuid, this.droppedClientStates.get(uuid));
                this.droppedClientStates.remove(uuid);
            }
        }

        private void dropNode() {
            if (this.clientStates.isEmpty()) {
                throw new NoSuchElementException("There are no nodes to drop");
            }
            UUID toDrop = this.clientStates.keySet().iterator().next();
            this.dropNode(toDrop);
        }

        private void dropRandomNodes(int numNode, Random prng) {
            for (int dropped = 0; !this.clientStates.isEmpty() && dropped < numNode; ++dropped) {
                UUID toDrop = Harness.selectRandomElement(prng, this.clientStates);
                this.dropNode(toDrop);
            }
            this.history.append("Stateless tasks: ").append(this.statelessTasks).append('\n');
            this.history.append("Stateful tasks:  ").append(this.statefulTaskEndOffsetSums.keySet()).append('\n');
            this.formatClientStates(true);
        }

        private void dropNode(UUID toDrop) {
            ClientState clientState = this.clientStates.remove(toDrop);
            this.history.append("Dropping node ").append(toDrop).append(": ").append(clientState).append('\n');
            this.droppedClientStates.put(toDrop, clientState);
        }

        private static UUID selectRandomElement(Random prng, Map<UUID, ClientState> clients) {
            int dropIndex = prng.nextInt(clients.size());
            UUID toDrop = null;
            for (UUID uuid : clients.keySet()) {
                if (dropIndex == 0) {
                    toDrop = uuid;
                    break;
                }
                --dropIndex;
            }
            return toDrop;
        }

        private void prepareForNextRebalance() {
            TreeMap<UUID, ClientState> newClientStates = new TreeMap<UUID, ClientState>();
            for (Map.Entry<UUID, ClientState> entry : this.clientStates.entrySet()) {
                UUID uuid = entry.getKey();
                ClientState newClientState = new ClientState(1);
                ClientState clientState = entry.getValue();
                TreeMap<TaskId, Long> taskOffsetSums = new TreeMap<TaskId, Long>();
                for (TaskId taskId : clientState.activeTasks()) {
                    if (!this.statefulTaskEndOffsetSums.containsKey(taskId)) continue;
                    taskOffsetSums.put(taskId, this.statefulTaskEndOffsetSums.get(taskId));
                }
                for (TaskId taskId : clientState.standbyTasks()) {
                    if (!this.statefulTaskEndOffsetSums.containsKey(taskId)) continue;
                    taskOffsetSums.put(taskId, this.statefulTaskEndOffsetSums.get(taskId));
                }
                newClientState.addPreviousActiveTasks(clientState.activeTasks());
                newClientState.addPreviousStandbyTasks(clientState.standbyTasks());
                newClientState.addPreviousTasksAndOffsetSums("consumer", taskOffsetSums);
                newClientState.computeTaskLags(uuid, this.statefulTaskEndOffsetSums);
                newClientStates.put(uuid, newClientState);
            }
            this.clientStates.clear();
            this.clientStates.putAll(newClientStates);
        }

        private void recordConfig(AssignorConfiguration.AssignmentConfigs configuration) {
            this.history.append("Creating assignor with configuration: ").append(configuration).append('\n');
        }

        private void recordBefore(int iteration) {
            this.history.append("Starting Iteration: ").append(iteration).append('\n');
            this.formatClientStates(false);
        }

        private void recordAfter(int iteration, boolean rebalancePending) {
            this.history.append("After assignment:  ").append(iteration).append('\n');
            this.history.append("Rebalance pending: ").append(rebalancePending).append('\n');
            this.formatClientStates(true);
            this.history.append('\n');
        }

        private void formatClientStates(boolean printUnassigned) {
            AssignmentTestUtils.appendClientStates(this.history, this.clientStates);
            if (printUnassigned) {
                TreeSet<TaskId> unassignedTasks = new TreeSet<TaskId>();
                unassignedTasks.addAll(this.statefulTaskEndOffsetSums.keySet());
                unassignedTasks.addAll(this.statelessTasks);
                for (Map.Entry<UUID, ClientState> entry : this.clientStates.entrySet()) {
                    unassignedTasks.removeAll(entry.getValue().assignedTasks());
                }
                this.history.append("Unassigned Tasks: ").append(unassignedTasks).append('\n');
            }
        }
    }
}

