package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.util.function.FunctionWithException;
import org.assertj.core.api.AbstractDoubleAssert;
import org.assertj.core.api.Assertions;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTestBase.class */
public class SourceStreamTaskTestBase {
    public void testMetrics(FunctionWithException<Environment, ? extends StreamTask<Integer, ?>, Exception> functionWithException, StreamOperatorFactory<?> streamOperatorFactory, Consumer<AbstractDoubleAssert<?>> consumer) throws Exception {
        StreamTaskMailboxTestHarnessBuilder streamTaskMailboxTestHarnessBuilder = new StreamTaskMailboxTestHarnessBuilder(functionWithException, BasicTypeInfo.INT_TYPE_INFO);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        StreamTaskMailboxTestHarness build = streamTaskMailboxTestHarnessBuilder.setupOutputForSingletonOperatorChain(streamOperatorFactory).setTaskMetricGroup(StreamTaskTestHarness.createTaskMetricGroup(concurrentHashMap)).build();
        Throwable th = null;
        try {
            try {
                CompletableFuture triggerCheckpointAsync = build.streamTask.triggerCheckpointAsync(new CheckpointMetaData(1L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation());
                OneShotLatch oneShotLatch = new OneShotLatch();
                build.getCheckpointResponder().setAcknowledgeLatch(oneShotLatch);
                Assertions.assertThat(triggerCheckpointAsync).isNotDone();
                Thread.sleep(42L);
                while (!triggerCheckpointAsync.isDone()) {
                    build.streamTask.runMailboxStep();
                }
                Assertions.assertThat((Long) ((Gauge) concurrentHashMap.get("checkpointStartDelayNanos")).getValue()).isGreaterThanOrEqualTo(42 * 1000000);
                consumer.accept(Assertions.assertThat((Double) ((Gauge) concurrentHashMap.get("busyTimeMsPerSecond")).getValue()));
                oneShotLatch.await();
                Assertions.assertThat(((TestCheckpointResponder.AcknowledgeReport) Iterables.getOnlyElement(build.getCheckpointResponder().getAcknowledgeReports())).getCheckpointMetrics().getCheckpointStartDelayNanos()).isGreaterThanOrEqualTo(42 * 1000000);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }
}
