package org.apache.flink.streaming.api.operators.co;

import java.io.Serializable;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.class */
class CoStreamFlatMapTest implements Serializable {
    private static final long serialVersionUID = 1;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest$MyCoFlatMap.class */
    private static final class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> {
        private static final long serialVersionUID = 1;

        private MyCoFlatMap() {
        }

        public void flatMap1(String str, Collector<String> collector) {
            for (int i = 0; i < str.length(); i++) {
                collector.collect(str.substring(i, i + 1));
            }
        }

        public void flatMap2(Integer num, Collector<String> collector) {
            collector.collect(num.toString());
        }

        public /* bridge */ /* synthetic */ void flatMap2(Object obj, Collector collector) throws Exception {
            flatMap2((Integer) obj, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void flatMap1(Object obj, Collector collector) throws Exception {
            flatMap1((String) obj, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest$TestOpenCloseCoFlatMapFunction.class */
    private static class TestOpenCloseCoFlatMapFunction extends RichCoFlatMapFunction<String, Integer, String> {
        private static final long serialVersionUID = 1;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        private TestOpenCloseCoFlatMapFunction() {
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            ((AbstractBooleanAssert) Assertions.assertThat(closeCalled).as("Close called before open.", new Object[0])).isFalse();
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            ((AbstractBooleanAssert) Assertions.assertThat(openCalled).as("Open was not called before close.", new Object[0])).isTrue();
            closeCalled = true;
        }

        public void flatMap1(String str, Collector<String> collector) throws Exception {
            ((AbstractBooleanAssert) Assertions.assertThat(openCalled).as("Open was not called before run.", new Object[0])).isTrue();
            collector.collect(str);
        }

        public void flatMap2(Integer num, Collector<String> collector) throws Exception {
            ((AbstractBooleanAssert) Assertions.assertThat(openCalled).as("Open was not called before run.", new Object[0])).isTrue();
            collector.collect(num.toString());
        }

        public /* bridge */ /* synthetic */ void flatMap2(Object obj, Collector collector) throws Exception {
            flatMap2((Integer) obj, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void flatMap1(Object obj, Collector collector) throws Exception {
            flatMap1((String) obj, (Collector<String>) collector);
        }
    }

    CoStreamFlatMapTest() {
    }

    @Test
    void testCoFlatMap() throws Exception {
        TwoInputStreamOperatorTestHarness twoInputStreamOperatorTestHarness = new TwoInputStreamOperatorTestHarness(new CoStreamFlatMap(new MyCoFlatMap()));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        twoInputStreamOperatorTestHarness.open();
        twoInputStreamOperatorTestHarness.processElement1(new StreamRecord("abc", 0 + serialVersionUID));
        twoInputStreamOperatorTestHarness.processElement1(new StreamRecord("def", 0 + 2));
        twoInputStreamOperatorTestHarness.processWatermark1(new Watermark(0 + 2));
        twoInputStreamOperatorTestHarness.processElement1(new StreamRecord("ghi", 0 + 3));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(1, 0 + serialVersionUID));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(2, 0 + 2));
        twoInputStreamOperatorTestHarness.processWatermark2(new Watermark(0 + 3));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(3, 0 + 3));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(4, 0 + 4));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(5, 0 + 5));
        concurrentLinkedQueue.add(new StreamRecord("a", 0 + serialVersionUID));
        concurrentLinkedQueue.add(new StreamRecord("b", 0 + serialVersionUID));
        concurrentLinkedQueue.add(new StreamRecord("c", 0 + serialVersionUID));
        concurrentLinkedQueue.add(new StreamRecord("d", 0 + 2));
        concurrentLinkedQueue.add(new StreamRecord("e", 0 + 2));
        concurrentLinkedQueue.add(new StreamRecord("f", 0 + 2));
        concurrentLinkedQueue.add(new StreamRecord("g", 0 + 3));
        concurrentLinkedQueue.add(new StreamRecord("h", 0 + 3));
        concurrentLinkedQueue.add(new StreamRecord("i", 0 + 3));
        concurrentLinkedQueue.add(new StreamRecord("1", 0 + serialVersionUID));
        concurrentLinkedQueue.add(new StreamRecord("2", 0 + 2));
        concurrentLinkedQueue.add(new Watermark(0 + 2));
        concurrentLinkedQueue.add(new StreamRecord("3", 0 + 3));
        concurrentLinkedQueue.add(new StreamRecord("4", 0 + 4));
        concurrentLinkedQueue.add(new StreamRecord("5", 0 + 5));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamOperatorTestHarness.getOutput());
    }

    @Test
    void testOpenClose() throws Exception {
        TwoInputStreamOperatorTestHarness twoInputStreamOperatorTestHarness = new TwoInputStreamOperatorTestHarness(new CoStreamFlatMap(new TestOpenCloseCoFlatMapFunction()));
        twoInputStreamOperatorTestHarness.open();
        twoInputStreamOperatorTestHarness.processElement1(new StreamRecord("Hello", 0L));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(42, 0L));
        twoInputStreamOperatorTestHarness.close();
        ((AbstractBooleanAssert) Assertions.assertThat(TestOpenCloseCoFlatMapFunction.closeCalled).as("RichFunction methods where not called.", new Object[0])).isTrue();
        Assertions.assertThat(twoInputStreamOperatorTestHarness.getOutput()).isNotEmpty();
    }
}
