/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Queue;
import java.util.function.Supplier;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;

public class StreamTaskMailboxTestHarness<OUT>
implements AutoCloseable {
    protected final StreamTask<OUT, ?> streamTask;
    protected final StreamMockEnvironment streamMockEnvironment;
    protected final TestTaskStateManager taskStateManager;
    protected final Queue<Object> outputList;
    protected final StreamTestSingleInputGate[] inputGates;
    protected final boolean[] inputGateEnded;
    private boolean autoProcess = true;

    StreamTaskMailboxTestHarness(StreamTask<OUT, ?> streamTask, Queue<Object> outputList, StreamTestSingleInputGate[] inputGates, StreamMockEnvironment streamMockEnvironment) {
        this.streamTask = (StreamTask)Preconditions.checkNotNull(streamTask);
        this.taskStateManager = (TestTaskStateManager)streamMockEnvironment.getTaskStateManager();
        this.inputGates = (StreamTestSingleInputGate[])Preconditions.checkNotNull((Object)inputGates);
        this.outputList = (Queue)Preconditions.checkNotNull(outputList);
        this.streamMockEnvironment = (StreamMockEnvironment)Preconditions.checkNotNull((Object)streamMockEnvironment);
        this.inputGateEnded = new boolean[inputGates.length];
    }

    public TestTaskStateManager getTaskStateManager() {
        return this.taskStateManager;
    }

    public StreamTask<OUT, ?> getStreamTask() {
        return this.streamTask;
    }

    public TimerService getTimerService() {
        return this.streamTask.getTimerService();
    }

    public Queue<Object> getOutput() {
        return this.outputList;
    }

    public void processElement(Object element) throws Exception {
        this.processElement(element, 0);
    }

    public void processElement(Object element, int inputGate) throws Exception {
        this.processElement(element, inputGate, 0);
    }

    public void processElement(Object element, int inputGate, int channel) throws Exception {
        this.inputGates[inputGate].sendElement(element, channel);
        this.maybeProcess();
    }

    public void processEvent(AbstractEvent event) throws Exception {
        this.processEvent(event, 0);
    }

    public void processEvent(AbstractEvent event, int inputGate) throws Exception {
        this.processEvent(event, inputGate, 0);
    }

    public void processEvent(AbstractEvent event, int inputGate, int channel) throws Exception {
        this.inputGates[inputGate].sendEvent(event, channel);
        this.maybeProcess();
    }

    private void maybeProcess() throws Exception {
        if (this.autoProcess) {
            this.processAll();
        }
    }

    public void processUntil(Supplier<Boolean> condition) throws Exception {
        while (!condition.get().booleanValue()) {
            this.processAll();
        }
    }

    public void processAll() throws Exception {
        while (this.processSingleStep()) {
        }
    }

    public boolean processSingleStep() throws Exception {
        if (this.streamTask.mailboxProcessor.isMailboxLoopRunning()) {
            return this.streamTask.runMailboxStep();
        }
        return false;
    }

    public MailboxExecutor getExecutor(int priority) {
        return this.streamTask.getMailboxExecutorFactory().createExecutor(priority);
    }

    public void endInput() {
        this.endInput(true);
    }

    public void endInput(boolean allDataProcessed) {
        for (int i = 0; i < this.inputGates.length; ++i) {
            this.endInput(i, allDataProcessed);
        }
    }

    public void endInput(int inputIndex, boolean emitEndOfData) {
        if (!this.inputGateEnded[inputIndex]) {
            this.inputGates[inputIndex].endInput(emitEndOfData);
            this.inputGateEnded[inputIndex] = true;
        }
    }

    public void waitForTaskCompletion() throws Exception {
        this.endInput();
        this.processAll();
    }

    public void finishProcessing() throws Exception {
        this.streamTask.afterInvoke();
        this.streamTask.cleanUp(null);
    }

    public void cancel() throws Exception {
        this.streamTask.cancel();
    }

    @Override
    public void close() throws Exception {
        if (this.streamTask.isRunning()) {
            this.streamTask.cancel();
            this.finishProcessing();
        }
        this.streamMockEnvironment.getIOManager().close();
        MemoryManager memMan = this.streamMockEnvironment.getMemoryManager();
        if (memMan != null) {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)memMan.verifyEmpty()).as("Memory Manager managed memory was not completely freed.", new Object[0])).isTrue();
            memMan.shutdown();
        }
    }

    public void setAutoProcess(boolean autoProcess) {
        this.autoProcess = autoProcess;
    }

    public TestCheckpointResponder getCheckpointResponder() {
        return (TestCheckpointResponder)this.taskStateManager.getCheckpointResponder();
    }

    public StreamMockEnvironment getStreamMockEnvironment() {
        return this.streamMockEnvironment;
    }
}

