/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class HighAvailabilityTaskAssignorTest {
    private final AssignorConfiguration.AssignmentConfigs configWithoutStandbys = new AssignorConfiguration.AssignmentConfigs(Long.valueOf(100L), Integer.valueOf(2), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
    private final AssignorConfiguration.AssignmentConfigs configWithStandbys = new AssignorConfiguration.AssignmentConfigs(Long.valueOf(100L), Integer.valueOf(2), Integer.valueOf(1), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);

    @Test
    public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {
        Set allTaskIds = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2});
        ClientState clientState1 = new ClientState(allTaskIds, Collections.emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        Map clientStates = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)AssignmentTestUtils.UUID_1, (Object)clientState1), Utils.mkEntry((Object)AssignmentTestUtils.UUID_2, (Object)clientState2), Utils.mkEntry((Object)AssignmentTestUtils.UUID_3, (Object)clientState3)});
        boolean unstable = new HighAvailabilityTaskAssignor().assign(clientStates, allTaskIds, allTaskIds, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(11L), Integer.valueOf(2), Integer.valueOf(1), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        MatcherAssert.assertThat((Object)clientState1, AssignmentTestUtils.hasAssignedTasks(allTaskIds.size()));
        MatcherAssert.assertThat((Object)clientState2, AssignmentTestUtils.hasAssignedTasks(allTaskIds.size()));
        MatcherAssert.assertThat((Object)clientState3, AssignmentTestUtils.hasAssignedTasks(2));
        MatcherAssert.assertThat((Object)unstable, (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void shouldSkipWarmupsWhenAcceptableLagIsMax() {
        Set allTaskIds = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2});
        ClientState clientState1 = new ClientState(allTaskIds, Collections.emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        Map clientStates = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)AssignmentTestUtils.UUID_1, (Object)clientState1), Utils.mkEntry((Object)AssignmentTestUtils.UUID_2, (Object)clientState2), Utils.mkEntry((Object)AssignmentTestUtils.UUID_3, (Object)clientState3)});
        boolean unstable = new HighAvailabilityTaskAssignor().assign(clientStates, allTaskIds, allTaskIds, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(Long.MAX_VALUE), Integer.valueOf(1), Integer.valueOf(1), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        MatcherAssert.assertThat((Object)clientState1, AssignmentTestUtils.hasAssignedTasks(6));
        MatcherAssert.assertThat((Object)clientState2, AssignmentTestUtils.hasAssignedTasks(6));
        MatcherAssert.assertThat((Object)clientState3, AssignmentTestUtils.hasAssignedTasks(6));
        MatcherAssert.assertThat((Object)unstable, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfClientsIntegralDivisorOfNumberOfTasks() {
        Set allTaskIds = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2});
        Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
        ClientState clientState1 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(clientState1, clientState2, clientState3);
        boolean unstable = new HighAvailabilityTaskAssignor().assign(clientStates, allTaskIds, allTaskIds, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(0L), Integer.valueOf(1), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        MatcherAssert.assertThat((Object)unstable, (Matcher)Matchers.is((Object)false));
        AssignmentTestUtils.assertValidAssignment(0, allTaskIds, Collections.emptySet(), clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(allTaskIds, clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(clientStates);
    }

    @Test
    public void shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfThreadsIntegralDivisorOfNumberOfTasks() {
        Set allTaskIds = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2});
        Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
        ClientState clientState1 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(clientState1, clientState2, clientState3);
        boolean unstable = new HighAvailabilityTaskAssignor().assign(clientStates, allTaskIds, allTaskIds, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(0L), Integer.valueOf(1), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        MatcherAssert.assertThat((Object)unstable, (Matcher)Matchers.is((Object)false));
        AssignmentTestUtils.assertValidAssignment(0, allTaskIds, Collections.emptySet(), clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(allTaskIds, clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(clientStates);
    }

    @Test
    public void shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfClientsNotIntegralDivisorOfNumberOfTasks() {
        Set allTaskIds = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2});
        Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
        ClientState clientState1 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(clientState1, clientState2);
        boolean unstable = new HighAvailabilityTaskAssignor().assign(clientStates, allTaskIds, allTaskIds, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(0L), Integer.valueOf(1), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        MatcherAssert.assertThat((Object)unstable, (Matcher)Matchers.is((Object)false));
        AssignmentTestUtils.assertValidAssignment(0, allTaskIds, Collections.emptySet(), clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(allTaskIds, clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(clientStates);
    }

    @Test
    public void shouldAssignActiveStatefulTasksEvenlyOverUnevenlyDistributedStreamThreads() {
        Set allTaskIds = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2});
        Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
        ClientState clientState1 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 2);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(clientState1, clientState2, clientState3);
        boolean unstable = new HighAvailabilityTaskAssignor().assign(clientStates, allTaskIds, allTaskIds, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(0L), Integer.valueOf(1), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        MatcherAssert.assertThat((Object)unstable, (Matcher)Matchers.is((Object)false));
        AssignmentTestUtils.assertValidAssignment(0, allTaskIds, Collections.emptySet(), clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(allTaskIds, clientStates, new StringBuilder());
        MatcherAssert.assertThat((Object)clientState1, AssignmentTestUtils.hasActiveTasks(1));
        MatcherAssert.assertThat((Object)clientState2, AssignmentTestUtils.hasActiveTasks(2));
        MatcherAssert.assertThat((Object)clientState3, AssignmentTestUtils.hasActiveTasks(3));
        AssignmentTestUtils.TaskSkewReport taskSkewReport = AssignmentTestUtils.analyzeTaskAssignmentBalance(clientStates);
        if (taskSkewReport.totalSkewedTasks() == 0) {
            Assert.fail((String)("Expected a skewed task assignment, but was: " + taskSkewReport));
        }
    }

    @Test
    public void shouldAssignActiveStatefulTasksEvenlyOverClientsWithMoreClientsThanTasks() {
        Set allTaskIds = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
        ClientState clientState1 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(clientState1, clientState2, clientState3);
        boolean unstable = new HighAvailabilityTaskAssignor().assign(clientStates, allTaskIds, allTaskIds, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(0L), Integer.valueOf(1), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        MatcherAssert.assertThat((Object)unstable, (Matcher)Matchers.is((Object)false));
        AssignmentTestUtils.assertValidAssignment(0, allTaskIds, Collections.emptySet(), clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(allTaskIds, clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(clientStates);
    }

    @Test
    public void shouldAssignActiveStatefulTasksEvenlyOverClientsAndStreamThreadsWithEqualStreamThreadsPerClientAsTasks() {
        Set allTaskIds = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2});
        Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
        ClientState clientState1 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 9);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 9);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 9);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(clientState1, clientState2, clientState3);
        boolean unstable = new HighAvailabilityTaskAssignor().assign(clientStates, allTaskIds, allTaskIds, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(0L), Integer.valueOf(1), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        MatcherAssert.assertThat((Object)unstable, (Matcher)Matchers.is((Object)false));
        AssignmentTestUtils.assertValidAssignment(0, allTaskIds, Collections.emptySet(), clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(allTaskIds, clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(clientStates);
    }

    @Test
    public void shouldAssignWarmUpTasksIfStatefulActiveTasksBalancedOverStreamThreadsButNotOverClients() {
        Set allTaskIds = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1});
        Map<TaskId, Long> lagsForCaughtUpClient = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L));
        Map<TaskId, Long> lagsForNotCaughtUpClient = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE));
        ClientState caughtUpClientState = new ClientState(allTaskIds, Collections.emptySet(), lagsForCaughtUpClient, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 5);
        ClientState notCaughtUpClientState1 = new ClientState(Collections.emptySet(), Collections.emptySet(), lagsForNotCaughtUpClient, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 5);
        ClientState notCaughtUpClientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), lagsForNotCaughtUpClient, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 5);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(caughtUpClientState, notCaughtUpClientState1, notCaughtUpClientState2);
        boolean unstable = new HighAvailabilityTaskAssignor().assign(clientStates, allTaskIds, allTaskIds, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(0L), Integer.valueOf(allTaskIds.size() / 3 + 1), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        MatcherAssert.assertThat((Object)unstable, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)notCaughtUpClientState1.standbyTaskCount(), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(allTaskIds.size() / 3)));
        MatcherAssert.assertThat((Object)notCaughtUpClientState2.standbyTaskCount(), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(allTaskIds.size() / 3)));
        AssignmentTestUtils.assertValidAssignment(0, allTaskIds.size() / 3 + 1, allTaskIds, Collections.emptySet(), clientStates, new StringBuilder());
    }

    @Test
    public void shouldEvenlyAssignActiveStatefulTasksIfClientsAreWarmedUpToBalanceTaskOverClients() {
        Set allTaskIds = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1});
        Set warmedUpTaskIds1 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_1});
        Set warmedUpTaskIds2 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_1_0});
        Map<TaskId, Long> lagsForCaughtUpClient = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L));
        Map<TaskId, Long> lagsForWarmedUpClient1 = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE));
        lagsForWarmedUpClient1.put(AssignmentTestUtils.TASK_0_1, 0L);
        Map<TaskId, Long> lagsForWarmedUpClient2 = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE));
        lagsForWarmedUpClient2.put(AssignmentTestUtils.TASK_1_0, 0L);
        ClientState caughtUpClientState = new ClientState(allTaskIds, Collections.emptySet(), lagsForCaughtUpClient, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 5);
        ClientState warmedUpClientState1 = new ClientState(Collections.emptySet(), warmedUpTaskIds1, lagsForWarmedUpClient1, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 5);
        ClientState warmedUpClientState2 = new ClientState(Collections.emptySet(), warmedUpTaskIds2, lagsForWarmedUpClient2, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 5);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(caughtUpClientState, warmedUpClientState1, warmedUpClientState2);
        boolean unstable = new HighAvailabilityTaskAssignor().assign(clientStates, allTaskIds, allTaskIds, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(0L), Integer.valueOf(allTaskIds.size() / 3 + 1), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        MatcherAssert.assertThat((Object)unstable, (Matcher)Matchers.is((Object)false));
        AssignmentTestUtils.assertValidAssignment(0, allTaskIds, Collections.emptySet(), clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(allTaskIds, clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(clientStates);
    }

    @Test
    public void shouldAssignActiveStatefulTasksEvenlyOverStreamThreadsButBestEffortOverClients() {
        Set allTaskIds = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2});
        Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
        ClientState clientState1 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 6);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), lags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(clientState1, clientState2);
        boolean unstable = new HighAvailabilityTaskAssignor().assign(clientStates, allTaskIds, allTaskIds, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(0L), Integer.valueOf(1), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        MatcherAssert.assertThat((Object)unstable, (Matcher)Matchers.is((Object)false));
        AssignmentTestUtils.assertValidAssignment(0, allTaskIds, Collections.emptySet(), clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(allTaskIds, clientStates, new StringBuilder());
        MatcherAssert.assertThat((Object)clientState1, AssignmentTestUtils.hasActiveTasks(6));
        MatcherAssert.assertThat((Object)clientState2, AssignmentTestUtils.hasActiveTasks(3));
    }

    @Test
    public void shouldComputeNewAssignmentIfThereAreUnassignedActiveTasks() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        ClientState client1 = new ClientState(Collections.singleton(AssignmentTestUtils.TASK_0_0), Collections.emptySet(), Collections.singletonMap(AssignmentTestUtils.TASK_0_0, 0L), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        Map<UUID, ClientState> clientStates = Collections.singletonMap(AssignmentTestUtils.UUID_1, client1);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, Collections.singleton(AssignmentTestUtils.TASK_0_0), this.configWithoutStandbys);
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)client1, AssignmentTestUtils.hasActiveTasks(2));
        MatcherAssert.assertThat((Object)client1, AssignmentTestUtils.hasStandbyTasks(0));
        AssignmentTestUtils.assertValidAssignment(0, allTasks, Collections.emptySet(), clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(allTasks, clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(clientStates);
    }

    @Test
    public void shouldComputeNewAssignmentIfThereAreUnassignedStandbyTasks() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set statefulTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        ClientState client1 = new ClientState(Collections.singleton(AssignmentTestUtils.TASK_0_0), Collections.emptySet(), Collections.singletonMap(AssignmentTestUtils.TASK_0_0, 0L), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState client2 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.singletonMap(AssignmentTestUtils.TASK_0_0, 0L), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        Map clientStates = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)AssignmentTestUtils.UUID_1, (Object)client1), Utils.mkEntry((Object)AssignmentTestUtils.UUID_2, (Object)client2)});
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, this.configWithStandbys);
        MatcherAssert.assertThat((Object)((ClientState)clientStates.get(AssignmentTestUtils.UUID_2)).standbyTasks(), (Matcher)Matchers.not((Matcher)Matchers.empty()));
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)false));
        AssignmentTestUtils.assertValidAssignment(1, allTasks, Collections.emptySet(), clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(allTasks, clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(clientStates);
    }

    @Test
    public void shouldComputeNewAssignmentIfActiveTasksWasNotOnCaughtUpClient() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        Set statefulTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        ClientState client1 = new ClientState(Collections.singleton(AssignmentTestUtils.TASK_0_0), Collections.emptySet(), Collections.singletonMap(AssignmentTestUtils.TASK_0_0, 500L), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState client2 = new ClientState(Collections.singleton(AssignmentTestUtils.TASK_0_1), Collections.emptySet(), Collections.singletonMap(AssignmentTestUtils.TASK_0_0, 0L), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        Map clientStates = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)AssignmentTestUtils.UUID_1, (Object)client1), Utils.mkEntry((Object)AssignmentTestUtils.UUID_2, (Object)client2)});
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, this.configWithoutStandbys);
        MatcherAssert.assertThat((Object)((ClientState)clientStates.get(AssignmentTestUtils.UUID_1)).activeTasks(), (Matcher)Matchers.is(Collections.singleton(AssignmentTestUtils.TASK_0_1)));
        MatcherAssert.assertThat((Object)((ClientState)clientStates.get(AssignmentTestUtils.UUID_2)).activeTasks(), (Matcher)Matchers.is(Collections.singleton(AssignmentTestUtils.TASK_0_0)));
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)true));
        AssignmentTestUtils.assertValidAssignment(0, 1, allTasks, Collections.emptySet(), clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(allTasks, clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(clientStates);
    }

    @Test
    public void shouldAssignToMostCaughtUpIfActiveTasksWasNotOnCaughtUpClient() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set statefulTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        ClientState client1 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.singletonMap(AssignmentTestUtils.TASK_0_0, Long.MAX_VALUE), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState client2 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.singletonMap(AssignmentTestUtils.TASK_0_0, 1000L), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState client3 = new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.singletonMap(AssignmentTestUtils.TASK_0_0, 500L), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        Map clientStates = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)AssignmentTestUtils.UUID_1, (Object)client1), Utils.mkEntry((Object)AssignmentTestUtils.UUID_2, (Object)client2), Utils.mkEntry((Object)AssignmentTestUtils.UUID_3, (Object)client3)});
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, this.configWithStandbys);
        MatcherAssert.assertThat((Object)((ClientState)clientStates.get(AssignmentTestUtils.UUID_1)).activeTasks(), (Matcher)Matchers.is(Collections.emptySet()));
        MatcherAssert.assertThat((Object)((ClientState)clientStates.get(AssignmentTestUtils.UUID_2)).activeTasks(), (Matcher)Matchers.is(Collections.emptySet()));
        MatcherAssert.assertThat((Object)((ClientState)clientStates.get(AssignmentTestUtils.UUID_3)).activeTasks(), (Matcher)Matchers.is(Collections.singleton(AssignmentTestUtils.TASK_0_0)));
        MatcherAssert.assertThat((Object)((ClientState)clientStates.get(AssignmentTestUtils.UUID_1)).standbyTasks(), (Matcher)Matchers.is(Collections.singleton(AssignmentTestUtils.TASK_0_0)));
        MatcherAssert.assertThat((Object)((ClientState)clientStates.get(AssignmentTestUtils.UUID_2)).standbyTasks(), (Matcher)Matchers.is(Collections.singleton(AssignmentTestUtils.TASK_0_0)));
        MatcherAssert.assertThat((Object)((ClientState)clientStates.get(AssignmentTestUtils.UUID_3)).standbyTasks(), (Matcher)Matchers.is(Collections.emptySet()));
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)true));
        AssignmentTestUtils.assertValidAssignment(1, 1, allTasks, Collections.emptySet(), clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(allTasks, clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(clientStates);
    }

    @Test
    public void shouldAssignStandbysForStatefulTasks() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        Set statefulTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        ClientState client1 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0}), statefulTasks);
        ClientState client2 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_1}), statefulTasks);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1, client2);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, this.configWithStandbys);
        MatcherAssert.assertThat((Object)client1.activeTasks(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0})));
        MatcherAssert.assertThat((Object)client2.activeTasks(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_1})));
        MatcherAssert.assertThat((Object)client1.standbyTasks(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_1})));
        MatcherAssert.assertThat((Object)client2.standbyTasks(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0})));
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldNotAssignStandbysForStatelessTasks() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        Set<TaskId> statefulTasks = AssignmentTestUtils.EMPTY_TASKS;
        ClientState client1 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, statefulTasks);
        ClientState client2 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, statefulTasks);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1, client2);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, this.configWithStandbys);
        MatcherAssert.assertThat((Object)client1.activeTaskCount(), (Matcher)CoreMatchers.equalTo((Object)1));
        MatcherAssert.assertThat((Object)client2.activeTaskCount(), (Matcher)CoreMatchers.equalTo((Object)1));
        HighAvailabilityTaskAssignorTest.assertHasNoStandbyTasks(client1, client2);
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldAssignWarmupReplicasEvenIfNoStandbyReplicasConfigured() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        Set statefulTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        ClientState client1 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}), statefulTasks);
        ClientState client2 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, statefulTasks);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1, client2);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, this.configWithoutStandbys);
        MatcherAssert.assertThat((Object)client1.activeTasks(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1})));
        MatcherAssert.assertThat((Object)client2.standbyTaskCount(), (Matcher)CoreMatchers.equalTo((Object)1));
        HighAvailabilityTaskAssignorTest.assertHasNoStandbyTasks(client1);
        HighAvailabilityTaskAssignorTest.assertHasNoActiveTasks(client2);
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void shouldNotAssignMoreThanMaxWarmupReplicas() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        Set statefulTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        ClientState client1 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3}), statefulTasks);
        ClientState client2 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, statefulTasks);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1, client2);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(100L), Integer.valueOf(1), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        MatcherAssert.assertThat((Object)client1.activeTasks(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3})));
        MatcherAssert.assertThat((Object)client2.standbyTaskCount(), (Matcher)CoreMatchers.equalTo((Object)1));
        HighAvailabilityTaskAssignorTest.assertHasNoStandbyTasks(client1);
        HighAvailabilityTaskAssignorTest.assertHasNoActiveTasks(client2);
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void shouldNotAssignWarmupAndStandbyToTheSameClient() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        Set statefulTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        ClientState client1 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3}), statefulTasks);
        ClientState client2 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, statefulTasks);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1, client2);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(100L), Integer.valueOf(1), Integer.valueOf(1), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        MatcherAssert.assertThat((Object)client1.activeTasks(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3})));
        MatcherAssert.assertThat((Object)client2.standbyTasks(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3})));
        HighAvailabilityTaskAssignorTest.assertHasNoStandbyTasks(client1);
        HighAvailabilityTaskAssignorTest.assertHasNoActiveTasks(client2);
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void shouldNotAssignAnyStandbysWithInsufficientCapacity() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        Set statefulTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        ClientState client1 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}), statefulTasks);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, this.configWithStandbys);
        MatcherAssert.assertThat((Object)client1.activeTasks(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1})));
        HighAvailabilityTaskAssignorTest.assertHasNoStandbyTasks(client1);
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldAssignActiveTasksToNotCaughtUpClientIfNoneExist() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        Set statefulTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        ClientState client1 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, statefulTasks);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, this.configWithStandbys);
        MatcherAssert.assertThat((Object)client1.activeTasks(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1})));
        HighAvailabilityTaskAssignorTest.assertHasNoStandbyTasks(client1);
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldNotAssignMoreThanMaxWarmupReplicasWithStandbys() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        Set statefulTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        ClientState client1 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(statefulTasks, statefulTasks);
        ClientState client2 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, statefulTasks);
        ClientState client3 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, statefulTasks);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1, client2, client3);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, this.configWithStandbys);
        AssignmentTestUtils.assertValidAssignment(1, 2, statefulTasks, Collections.emptySet(), clientStates, new StringBuilder());
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void shouldDistributeStatelessTasksToBalanceTotalTaskLoad() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2});
        Set statefulTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        Set statelessTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2});
        ClientState client1 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(statefulTasks, statefulTasks);
        ClientState client2 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, statefulTasks);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1, client2);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, this.configWithStandbys);
        AssignmentTestUtils.assertValidAssignment(1, 2, statefulTasks, statelessTasks, clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(statefulTasks, clientStates, new StringBuilder());
        AssignmentTestUtils.TaskSkewReport taskSkewReport = AssignmentTestUtils.analyzeTaskAssignmentBalance(clientStates);
        MatcherAssert.assertThat((String)taskSkewReport.toString(), taskSkewReport.skewedSubtopologies(), (Matcher)Matchers.not((Matcher)Matchers.empty()));
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void shouldDistributeStatefulActiveTasksToAllClients() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_1_3, AssignmentTestUtils.TASK_2_0});
        Map<TaskId, Long> allTaskLags = allTasks.stream().collect(Collectors.toMap(t -> t, t -> 0L));
        HashSet statefulTasks = new HashSet(allTasks);
        ClientState client1 = new ClientState(Collections.emptySet(), Collections.emptySet(), allTaskLags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 100);
        ClientState client2 = new ClientState(Collections.emptySet(), Collections.emptySet(), allTaskLags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 50);
        ClientState client3 = new ClientState(Collections.emptySet(), Collections.emptySet(), allTaskLags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1, client2, client3);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, this.configWithoutStandbys);
        MatcherAssert.assertThat((Object)client1.activeTasks(), (Matcher)Matchers.not((Matcher)Matchers.empty()));
        MatcherAssert.assertThat((Object)client2.activeTasks(), (Matcher)Matchers.not((Matcher)Matchers.empty()));
        MatcherAssert.assertThat((Object)client3.activeTasks(), (Matcher)Matchers.not((Matcher)Matchers.empty()));
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldReturnFalseIfPreviousAssignmentIsReused() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        HashSet<TaskId> statefulTasks = new HashSet<TaskId>(allTasks);
        ClientState client1 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_2}), statefulTasks);
        ClientState client2 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_3}), statefulTasks);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1, client2);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, this.configWithoutStandbys);
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)client1.activeTasks(), (Matcher)CoreMatchers.equalTo((Object)client1.prevActiveTasks()));
        MatcherAssert.assertThat((Object)client2.activeTasks(), (Matcher)CoreMatchers.equalTo((Object)client2.prevActiveTasks()));
    }

    @Test
    public void shouldReturnFalseIfNoWarmupTasksAreAssigned() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        Set<TaskId> statefulTasks = AssignmentTestUtils.EMPTY_TASKS;
        ClientState client1 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, statefulTasks);
        ClientState client2 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, statefulTasks);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1, client2);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, this.configWithoutStandbys);
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)false));
        HighAvailabilityTaskAssignorTest.assertHasNoStandbyTasks(client1, client2);
    }

    @Test
    public void shouldReturnTrueIfWarmupTasksAreAssigned() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        Set statefulTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        ClientState client1 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(allTasks, statefulTasks);
        ClientState client2 = HighAvailabilityTaskAssignorTest.getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, statefulTasks);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1, client2);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, this.configWithoutStandbys);
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)client2.standbyTaskCount(), (Matcher)CoreMatchers.equalTo((Object)1));
    }

    @Test
    public void shouldDistributeStatelessTasksEvenlyOverClientsWithEqualStreamThreadsPerClientAsTasksAndNoStatefulTasks() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2});
        Set<TaskId> statefulTasks = AssignmentTestUtils.EMPTY_TASKS;
        HashSet<TaskId> statelessTasks = new HashSet<TaskId>(allTasks);
        HashMap taskLags = new HashMap();
        ClientState client1 = new ClientState(Collections.emptySet(), Collections.emptySet(), taskLags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 7);
        ClientState client2 = new ClientState(Collections.emptySet(), Collections.emptySet(), taskLags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 7);
        ClientState client3 = new ClientState(Collections.emptySet(), Collections.emptySet(), taskLags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 7);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1, client2, client3);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(0L), Integer.valueOf(1), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        AssignmentTestUtils.assertValidAssignment(0, AssignmentTestUtils.EMPTY_TASKS, statelessTasks, clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldDistributeStatelessTasksEvenlyOverClientsWithLessStreamThreadsPerClientAsTasksAndNoStatefulTasks() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2});
        Set<TaskId> statefulTasks = AssignmentTestUtils.EMPTY_TASKS;
        HashSet<TaskId> statelessTasks = new HashSet<TaskId>(allTasks);
        HashMap taskLags = new HashMap();
        ClientState client1 = new ClientState(Collections.emptySet(), Collections.emptySet(), taskLags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 2);
        ClientState client2 = new ClientState(Collections.emptySet(), Collections.emptySet(), taskLags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 2);
        ClientState client3 = new ClientState(Collections.emptySet(), Collections.emptySet(), taskLags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 2);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1, client2, client3);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(0L), Integer.valueOf(1), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        AssignmentTestUtils.assertValidAssignment(0, AssignmentTestUtils.EMPTY_TASKS, statelessTasks, clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldDistributeStatelessTasksEvenlyOverClientsWithUnevenlyDistributedStreamThreadsAndNoStatefulTasks() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2});
        Set<TaskId> statefulTasks = AssignmentTestUtils.EMPTY_TASKS;
        HashSet<TaskId> statelessTasks = new HashSet<TaskId>(allTasks);
        HashMap taskLags = new HashMap();
        ClientState client1 = new ClientState(Collections.emptySet(), Collections.emptySet(), taskLags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        ClientState client2 = new ClientState(Collections.emptySet(), Collections.emptySet(), taskLags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 2);
        ClientState client3 = new ClientState(Collections.emptySet(), Collections.emptySet(), taskLags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1, client2, client3);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(0L), Integer.valueOf(1), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        AssignmentTestUtils.assertValidAssignment(0, AssignmentTestUtils.EMPTY_TASKS, statelessTasks, clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldDistributeStatelessTasksEvenlyWithPreviousAssignmentAndNoStatefulTasks() {
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2});
        Set<TaskId> statefulTasks = AssignmentTestUtils.EMPTY_TASKS;
        HashSet<TaskId> statelessTasks = new HashSet<TaskId>(allTasks);
        HashMap taskLags = new HashMap();
        ClientState client1 = new ClientState(statelessTasks, Collections.emptySet(), taskLags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3);
        ClientState client2 = new ClientState(Collections.emptySet(), Collections.emptySet(), taskLags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3);
        ClientState client3 = new ClientState(Collections.emptySet(), Collections.emptySet(), taskLags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3);
        Map<UUID, ClientState> clientStates = AssignmentTestUtils.getClientStatesMap(client1, client2, client3);
        boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, new AssignorConfiguration.AssignmentConfigs(Long.valueOf(0L), Integer.valueOf(1), Integer.valueOf(0), Long.valueOf(60000L), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS));
        AssignmentTestUtils.assertValidAssignment(0, AssignmentTestUtils.EMPTY_TASKS, statelessTasks, clientStates, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStates, new StringBuilder());
        MatcherAssert.assertThat((Object)probingRebalanceNeeded, (Matcher)Matchers.is((Object)false));
    }

    private static void assertHasNoActiveTasks(ClientState ... clients) {
        for (ClientState client : clients) {
            MatcherAssert.assertThat((Object)client.activeTasks(), (Matcher)Matchers.is((Matcher)Matchers.empty()));
        }
    }

    private static void assertHasNoStandbyTasks(ClientState ... clients) {
        for (ClientState client : clients) {
            MatcherAssert.assertThat((Object)client, AssignmentTestUtils.hasStandbyTasks(0));
        }
    }

    private static ClientState getMockClientWithPreviousCaughtUpTasks(Set<TaskId> statefulActiveTasks, Set<TaskId> statefulTasks) {
        if (!statefulTasks.containsAll(statefulActiveTasks)) {
            throw new IllegalArgumentException("Need to initialize stateful tasks set before creating mock clients");
        }
        HashMap<TaskId, Long> taskLags = new HashMap<TaskId, Long>();
        for (TaskId task : statefulTasks) {
            if (statefulActiveTasks.contains(task)) {
                taskLags.put(task, 0L);
                continue;
            }
            taskLags.put(task, Long.MAX_VALUE);
        }
        return new ClientState(statefulActiveTasks, Collections.emptySet(), taskLags, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
    }
}

