/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.tasks;

import java.time.Duration;
import java.util.Collections;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata;
import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskExecutor;
import org.apache.kafka.streams.processor.internals.tasks.TaskExecutor;
import org.apache.kafka.streams.processor.internals.tasks.TaskManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class DefaultTaskExecutorTest {
    private static final long VERIFICATION_TIMEOUT = 15000L;
    private final Time time = new MockTime(1L);
    private final StreamTask task = (StreamTask)Mockito.mock(StreamTask.class);
    private final TaskManager taskManager = (TaskManager)Mockito.mock(TaskManager.class);
    private final TaskExecutionMetadata taskExecutionMetadata = (TaskExecutionMetadata)Mockito.mock(TaskExecutionMetadata.class);
    private final DefaultTaskExecutor taskExecutor = new DefaultTaskExecutor(this.taskManager, "TaskExecutor", this.time, this.taskExecutionMetadata);

    @BeforeEach
    public void setUp() {
        Mockito.when((Object)this.taskManager.assignNextTask((TaskExecutor)this.taskExecutor)).thenReturn((Object)this.task).thenReturn(null);
        Mockito.when((Object)this.taskExecutionMetadata.canProcessTask((Task)ArgumentMatchers.eq((Object)this.task), ArgumentMatchers.anyLong())).thenReturn((Object)true);
        Mockito.when((Object)this.task.isProcessable(ArgumentMatchers.anyLong())).thenReturn((Object)true);
        Mockito.when((Object)this.task.process(ArgumentMatchers.anyLong())).thenReturn((Object)true);
        Mockito.when((Object)this.task.prepareCommit()).thenReturn(Collections.emptyMap());
    }

    @AfterEach
    public void tearDown() {
        this.taskExecutor.shutdown(Duration.ofMinutes(1L));
    }

    @Test
    public void shouldShutdownTaskExecutor() {
        Assertions.assertNull((Object)this.taskExecutor.currentTask(), (String)"Have task assigned before startup");
        this.taskExecutor.start();
        ((TaskManager)Mockito.verify((Object)this.taskManager, (VerificationMode)Mockito.timeout((long)15000L))).assignNextTask((TaskExecutor)this.taskExecutor);
        this.taskExecutor.shutdown(Duration.ofMinutes(1L));
        ((StreamTask)Mockito.verify((Object)this.task)).prepareCommit();
        ((TaskManager)Mockito.verify((Object)this.taskManager)).unassignTask(this.task, (TaskExecutor)this.taskExecutor);
        Assertions.assertNull((Object)this.taskExecutor.currentTask(), (String)"Have task assigned after shutdown");
    }

    @Test
    public void shouldUnassignTaskWhenNotProgressing() {
        Mockito.when((Object)this.task.isProcessable(ArgumentMatchers.anyLong())).thenReturn((Object)false);
        Mockito.when((Object)this.task.maybePunctuateStreamTime()).thenReturn((Object)false);
        Mockito.when((Object)this.task.maybePunctuateSystemTime()).thenReturn((Object)false);
        this.taskExecutor.start();
        ((TaskManager)Mockito.verify((Object)this.taskManager, (VerificationMode)Mockito.timeout((long)15000L))).unassignTask(this.task, (TaskExecutor)this.taskExecutor);
        ((StreamTask)Mockito.verify((Object)this.task)).prepareCommit();
        Assertions.assertNull((Object)this.taskExecutor.currentTask());
    }

    @Test
    public void shouldProcessTasks() {
        Mockito.when((Object)this.taskExecutionMetadata.canProcessTask((Task)ArgumentMatchers.any(), ArgumentMatchers.anyLong())).thenReturn((Object)true);
        Mockito.when((Object)this.task.isProcessable(ArgumentMatchers.anyLong())).thenReturn((Object)true);
        this.taskExecutor.start();
        ((StreamTask)Mockito.verify((Object)this.task, (VerificationMode)Mockito.timeout((long)15000L).atLeast(2))).process(ArgumentMatchers.anyLong());
    }

    @Test
    public void shouldPunctuateStreamTime() {
        Mockito.when((Object)this.taskExecutionMetadata.canProcessTask((Task)ArgumentMatchers.eq((Object)this.task), ArgumentMatchers.anyLong())).thenReturn((Object)false);
        Mockito.when((Object)this.taskExecutionMetadata.canPunctuateTask((Task)this.task)).thenReturn((Object)true);
        Mockito.when((Object)this.task.maybePunctuateStreamTime()).thenReturn((Object)true);
        this.taskExecutor.start();
        ((StreamTask)Mockito.verify((Object)this.task, (VerificationMode)Mockito.timeout((long)15000L).atLeast(2))).maybePunctuateStreamTime();
    }

    @Test
    public void shouldPunctuateSystemTime() {
        Mockito.when((Object)this.taskExecutionMetadata.canProcessTask((Task)ArgumentMatchers.eq((Object)this.task), ArgumentMatchers.anyLong())).thenReturn((Object)false);
        Mockito.when((Object)this.taskExecutionMetadata.canPunctuateTask((Task)this.task)).thenReturn((Object)true);
        Mockito.when((Object)this.task.maybePunctuateSystemTime()).thenReturn((Object)true);
        this.taskExecutor.start();
        ((StreamTask)Mockito.verify((Object)this.task, (VerificationMode)Mockito.timeout((long)15000L).atLeast(2))).maybePunctuateSystemTime();
    }

    @Test
    public void shouldRespectPunctuationDisabledByTaskExecutionMetadata() {
        Mockito.when((Object)this.taskExecutionMetadata.canProcessTask((Task)ArgumentMatchers.eq((Object)this.task), ArgumentMatchers.anyLong())).thenReturn((Object)true);
        Mockito.when((Object)this.taskExecutionMetadata.canPunctuateTask((Task)this.task)).thenReturn((Object)false);
        Mockito.when((Object)this.task.isProcessable(ArgumentMatchers.anyLong())).thenReturn((Object)true);
        this.taskExecutor.start();
        ((StreamTask)Mockito.verify((Object)this.task, (VerificationMode)Mockito.timeout((long)15000L).atLeast(2))).process(ArgumentMatchers.anyLong());
        this.taskExecutor.unassign();
        ((TaskManager)Mockito.verify((Object)this.taskManager, (VerificationMode)Mockito.timeout((long)15000L))).unassignTask(this.task, (TaskExecutor)this.taskExecutor);
        ((StreamTask)Mockito.verify((Object)this.task, (VerificationMode)Mockito.never())).maybePunctuateStreamTime();
        ((StreamTask)Mockito.verify((Object)this.task, (VerificationMode)Mockito.never())).maybePunctuateSystemTime();
    }

    @Test
    public void shouldRespectProcessingDisabledByTaskExecutionMetadata() {
        Mockito.when((Object)this.taskExecutionMetadata.canProcessTask((Task)ArgumentMatchers.eq((Object)this.task), ArgumentMatchers.anyLong())).thenReturn((Object)false);
        Mockito.when((Object)this.taskExecutionMetadata.canPunctuateTask((Task)this.task)).thenReturn((Object)true);
        Mockito.when((Object)this.task.isProcessable(ArgumentMatchers.anyLong())).thenReturn((Object)true);
        this.taskExecutor.start();
        ((StreamTask)Mockito.verify((Object)this.task, (VerificationMode)Mockito.timeout((long)15000L))).maybePunctuateSystemTime();
        ((StreamTask)Mockito.verify((Object)this.task, (VerificationMode)Mockito.timeout((long)15000L))).maybePunctuateStreamTime();
        this.taskExecutor.unassign();
        ((TaskManager)Mockito.verify((Object)this.taskManager, (VerificationMode)Mockito.timeout((long)15000L))).unassignTask(this.task, (TaskExecutor)this.taskExecutor);
        ((StreamTask)Mockito.verify((Object)this.task, (VerificationMode)Mockito.never())).process(ArgumentMatchers.anyLong());
    }

    @Test
    public void shouldUnassignTaskWhenRequired() throws Exception {
        this.taskExecutor.start();
        ((TaskManager)Mockito.verify((Object)this.taskManager, (VerificationMode)Mockito.timeout((long)15000L))).assignNextTask((TaskExecutor)this.taskExecutor);
        Assertions.assertNotNull((Object)this.taskExecutor.currentTask());
        KafkaFuture future = this.taskExecutor.unassign();
        ((TaskManager)Mockito.verify((Object)this.taskManager, (VerificationMode)Mockito.timeout((long)15000L))).unassignTask(this.task, (TaskExecutor)this.taskExecutor);
        ((StreamTask)Mockito.verify((Object)this.task)).prepareCommit();
        Assertions.assertNull((Object)this.taskExecutor.currentTask());
        Assertions.assertTrue((boolean)future.isDone(), (String)"Unassign is not completed");
        Assertions.assertEquals((Object)this.task, (Object)future.get(), (String)"Unexpected task was unassigned");
    }

    @Test
    public void shouldSetUncaughtStreamsException() {
        StreamsException exception = (StreamsException)((Object)Mockito.mock(StreamsException.class));
        Mockito.when((Object)this.task.process(ArgumentMatchers.anyLong())).thenThrow(new Throwable[]{exception});
        this.taskExecutor.start();
        ((TaskManager)Mockito.verify((Object)this.taskManager, (VerificationMode)Mockito.timeout((long)15000L))).assignNextTask((TaskExecutor)this.taskExecutor);
        ((TaskManager)Mockito.verify((Object)this.taskManager, (VerificationMode)Mockito.timeout((long)15000L))).setUncaughtException(exception, this.task.id());
        ((TaskManager)Mockito.verify((Object)this.taskManager, (VerificationMode)Mockito.timeout((long)15000L))).unassignTask(this.task, (TaskExecutor)this.taskExecutor);
        Assertions.assertNull((Object)this.taskExecutor.currentTask());
    }
}

