/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.SourceOperatorTestContext;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.util.CollectionUtil;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class SourceOperatorTest {
    @Nullable
    private SourceOperatorTestContext context;
    @Nullable
    private SourceOperator<Integer, MockSourceSplit> operator;
    @Nullable
    private MockSourceReader mockSourceReader;
    @Nullable
    private MockOperatorEventGateway mockGateway;

    SourceOperatorTest() {
    }

    @BeforeEach
    void setup() throws Exception {
        this.context = new SourceOperatorTestContext();
        this.operator = this.context.getOperator();
        this.mockSourceReader = this.context.getSourceReader();
        this.mockGateway = this.context.getGateway();
    }

    @AfterEach
    void tearDown() throws Exception {
        this.context.close();
        this.context = null;
        this.operator = null;
        this.mockSourceReader = null;
        this.mockGateway = null;
    }

    @Test
    void testInitializeState() throws Exception {
        StateInitializationContext stateContext = this.context.createStateContext();
        this.operator.initializeState(stateContext);
        Assertions.assertThat((Object)stateContext.getOperatorStateStore().getListState(SourceOperator.SPLITS_STATE_DESC)).isNotNull();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testOpen(boolean supportsSplitReassignmentOnRecovery) throws Exception {
        try (SourceOperatorTestContext context = new SourceOperatorTestContext(false, false, (WatermarkStrategy<Integer>)WatermarkStrategy.noWatermarks(), new MockOutput<Integer>(new ArrayList()), supportsSplitReassignmentOnRecovery);){
            SourceOperator<Integer, MockSourceSplit> operator = context.getOperator();
            operator.initializeState(context.createStateContext());
            operator.open();
            if (supportsSplitReassignmentOnRecovery) {
                Assertions.assertThat((List)context.getSourceReader().getAssignedSplits()).isEmpty();
            } else {
                Assertions.assertThat((List)context.getSourceReader().getAssignedSplits()).containsExactly((Object[])new MockSourceSplit[]{SourceOperatorTestContext.MOCK_SPLIT});
            }
            Assertions.assertThat((boolean)context.getSourceReader().isStarted()).isTrue();
            Assertions.assertThat(context.getGateway().getEventsSent()).hasSize(1);
            OperatorEvent operatorEvent = context.getGateway().getEventsSent().get(0);
            Assertions.assertThat((Object)operatorEvent).isInstanceOf(ReaderRegistrationEvent.class);
            ReaderRegistrationEvent registrationEvent = (ReaderRegistrationEvent)operatorEvent;
            Assertions.assertThat((int)registrationEvent.subtaskId()).isEqualTo(1);
            if (supportsSplitReassignmentOnRecovery) {
                Assertions.assertThat((List)registrationEvent.splits((SimpleVersionedSerializer)new MockSourceSplitSerializer())).containsExactly((Object[])new MockSourceSplit[]{SourceOperatorTestContext.MOCK_SPLIT});
            } else {
                Assertions.assertThat((List)registrationEvent.splits((SimpleVersionedSerializer)new MockSourceSplitSerializer())).isEmpty();
            }
        }
    }

    @Test
    void testStop() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        Assertions.assertThat((List)this.mockSourceReader.getAssignedSplits()).containsExactly((Object[])new MockSourceSplit[]{SourceOperatorTestContext.MOCK_SPLIT});
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        Assertions.assertThat((Comparable)this.operator.emitNext(dataOutput)).isEqualTo((Object)DataInputStatus.NOTHING_AVAILABLE);
        Assertions.assertThat((boolean)this.operator.isAvailable()).isFalse();
        CompletableFuture sourceStopped = this.operator.stop(StopMode.DRAIN);
        Assertions.assertThat((boolean)this.operator.isAvailable()).isTrue();
        Assertions.assertThat((CompletableFuture)sourceStopped).isNotDone();
        Assertions.assertThat((Comparable)this.operator.emitNext(dataOutput)).isEqualTo((Object)DataInputStatus.END_OF_DATA);
        this.operator.finish();
        Assertions.assertThat((CompletableFuture)sourceStopped).isDone();
    }

    @Test
    void testHandleAddSplitsEvent() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        MockSourceSplit newSplit = new MockSourceSplit(2);
        this.operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(newSplit), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        Assertions.assertThat((List)this.mockSourceReader.getAssignedSplits()).containsExactly((Object[])new MockSourceSplit[]{SourceOperatorTestContext.MOCK_SPLIT, newSplit});
    }

    @Test
    void testHandleAddSourceEvent() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        SourceEvent event = new SourceEvent(){};
        this.operator.handleOperatorEvent((OperatorEvent)new SourceEventWrapper(event));
        Assertions.assertThat((List)this.mockSourceReader.getReceivedSourceEvents()).containsExactly((Object[])new SourceEvent[]{event});
    }

    @Test
    void testSnapshotState() throws Exception {
        StateInitializationContext stateContext = this.context.createStateContext();
        this.operator.initializeState(stateContext);
        this.operator.open();
        MockSourceSplit newSplit = new MockSourceSplit(2);
        this.operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(newSplit), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        this.operator.snapshotState((StateSnapshotContext)new StateSnapshotContextSynchronousImpl(100L, 100L));
        List splitsInState = CollectionUtil.iterableToList((Iterable)((Iterable)this.operator.getReaderState().get()));
        Assertions.assertThat((List)splitsInState).containsExactly((Object[])new MockSourceSplit[]{SourceOperatorTestContext.MOCK_SPLIT, newSplit});
    }

    @Test
    void testNotifyCheckpointComplete() throws Exception {
        StateInitializationContext stateContext = this.context.createStateContext();
        this.operator.initializeState(stateContext);
        this.operator.open();
        this.operator.snapshotState((StateSnapshotContext)new StateSnapshotContextSynchronousImpl(100L, 100L));
        this.operator.notifyCheckpointComplete(100L);
        Assertions.assertThat((Long)((Long)this.mockSourceReader.getCompletedCheckpoints().get(0))).isEqualTo(100L);
    }

    @Test
    void testNotifyCheckpointAborted() throws Exception {
        StateInitializationContext stateContext = this.context.createStateContext();
        this.operator.initializeState(stateContext);
        this.operator.open();
        this.operator.snapshotState((StateSnapshotContext)new StateSnapshotContextSynchronousImpl(100L, 100L));
        this.operator.notifyCheckpointAborted(100L);
        Assertions.assertThat((Long)((Long)this.mockSourceReader.getAbortedCheckpoints().get(0))).isEqualTo(100L);
    }

    @Test
    void testHandleBacklogEvent() throws Exception {
        ArrayList<StreamElement> outputStreamElements = new ArrayList<StreamElement>();
        this.context = new SourceOperatorTestContext(false, false, (WatermarkStrategy<Integer>)WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner & Serializable)(element, recordTimestamp) -> element.intValue()), new CollectorOutput<Integer>(outputStreamElements), false);
        this.operator = this.context.getOperator();
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        MockSourceSplit newSplit = new MockSourceSplit(2);
        newSplit.addRecord(1);
        newSplit.addRecord(1001);
        this.operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(newSplit), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        DataOutputToOutput output = new DataOutputToOutput(this.operator.output);
        this.operator.emitNext(output);
        this.operator.handleOperatorEvent((OperatorEvent)new IsProcessingBacklogEvent(true));
        this.operator.emitNext(output);
        this.operator.handleOperatorEvent((OperatorEvent)new IsProcessingBacklogEvent(false));
        Assertions.assertThat(outputStreamElements).containsExactly((Object[])new StreamElement[]{new StreamRecord((Object)1, 1L), new Watermark(0L), new RecordAttributes(true), new StreamRecord((Object)1001, 1001L), new Watermark(1000L), new RecordAttributes(false)});
    }

    @Test
    public void testMetricGroupIsCreatedForNewSplit() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        MockSourceSplit newSplit = new MockSourceSplit(2);
        this.operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(newSplit), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        Assert.assertNotNull((Object)this.operator.getSplitMetricGroup(newSplit.splitId()));
    }

    @Test
    public void testMetricGroupIsCreatedForRestoredSplit() throws Exception {
        MockSourceSplit restoredSplit = new MockSourceSplit(1);
        StateInitializationContext stateContext = this.context.createStateContext(Collections.singletonList(restoredSplit));
        this.operator.initializeState(stateContext);
        this.operator.open();
        Assert.assertNotNull((Object)this.operator.getSplitMetricGroup(restoredSplit.splitId()));
    }

    @Test
    public void testMetricGroupTracksSplitWatermark() throws Exception {
        long expectedWatermark = 1000L;
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        MockSourceSplit split = new MockSourceSplit(2);
        this.operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(split), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        this.operator.updateCurrentSplitWatermark(split.splitId(), expectedWatermark);
        Assert.assertEquals((long)expectedWatermark, (long)this.operator.getSplitMetricGroup(split.splitId()).getCurrentWatermark());
    }

    @Test
    public void testMetricGroupReturnsDefaultIfNoSplitWatermark() throws Exception {
        long expectedWatermark = Watermark.UNINITIALIZED.getTimestamp();
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        MockSourceSplit split = new MockSourceSplit(2);
        this.operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(split), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        Assert.assertEquals((long)expectedWatermark, (long)this.operator.getSplitMetricGroup(split.splitId()).getCurrentWatermark());
    }

    @Test
    public void testMultipleMetricGroupsReturnWatermarkOrDefaultWatermark() throws Exception {
        long expectedWatermarkValueForSplit0 = Watermark.UNINITIALIZED.getTimestamp();
        long expectedWatermarkValueForSplit1 = 1000L;
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        MockSourceSplit split0 = new MockSourceSplit(19);
        MockSourceSplit split1 = new MockSourceSplit(11);
        this.operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Arrays.asList(split0, split1), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        this.operator.updateCurrentSplitWatermark(split1.splitId(), expectedWatermarkValueForSplit1);
        Assert.assertEquals((long)expectedWatermarkValueForSplit0, (long)this.operator.getSplitMetricGroup(split0.splitId()).getCurrentWatermark());
        Assert.assertEquals((long)expectedWatermarkValueForSplit1, (long)this.operator.getSplitMetricGroup(split1.splitId()).getCurrentWatermark());
    }

    private static class DataOutputToOutput<T>
    implements PushingAsyncDataInput.DataOutput<T> {
        private final Output<StreamRecord<T>> output;

        DataOutputToOutput(Output<StreamRecord<T>> output) {
            this.output = output;
        }

        public void emitRecord(StreamRecord<T> streamRecord) throws Exception {
            this.output.collect(streamRecord);
        }

        public void emitWatermark(Watermark watermark) throws Exception {
            this.output.emitWatermark(watermark);
        }

        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
            this.output.emitWatermarkStatus(watermarkStatus);
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            this.output.emitLatencyMarker(latencyMarker);
        }

        public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
            this.output.emitRecordAttributes(recordAttributes);
        }

        public void emitWatermark(WatermarkEvent watermark) throws Exception {
            this.output.emitWatermark(watermark);
        }
    }
}

