/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.collect;

import java.io.IOException;
import java.net.BindException;
import java.net.ServerSocket;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
import org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper;
import org.apache.flink.streaming.api.operators.collect.utils.CollectTestUtils;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class CollectSinkFunctionTest {
    private static final TypeSerializer<Integer> serializer = IntSerializer.INSTANCE;
    private CollectSinkFunctionTestWrapper<Integer> functionWrapper;

    CollectSinkFunctionTest() {
    }

    @BeforeEach
    void before() throws Exception {
        this.functionWrapper = new CollectSinkFunctionTestWrapper<Integer>(serializer, 12);
    }

    @AfterEach
    void after() throws Exception {
        this.functionWrapper.closeWrapper();
    }

    @Test
    void testIncreasingToken() throws Exception {
        this.functionWrapper.openFunction();
        for (int i = 0; i < 6; ++i) {
            this.functionWrapper.invoke(i);
        }
        String version = this.initializeVersion();
        CollectCoordinationResponse response = this.functionWrapper.sendRequestAndGetResponse(version, 0L);
        this.assertResponseEquals(response, version, 0L, Arrays.asList(0, 1, 2));
        response = this.functionWrapper.sendRequestAndGetResponse(version, 4L);
        this.assertResponseEquals(response, version, 0L, Arrays.asList(4, 5));
        response = this.functionWrapper.sendRequestAndGetResponse(version, 6L);
        this.assertResponseEquals(response, version, 0L, Collections.emptyList());
        this.functionWrapper.closeFunctionNormally();
    }

    @Test
    void testDuplicatedToken() throws Exception {
        this.functionWrapper.openFunction();
        for (int i = 0; i < 6; ++i) {
            this.functionWrapper.invoke(i);
        }
        String version = this.initializeVersion();
        CollectCoordinationResponse response = this.functionWrapper.sendRequestAndGetResponse(version, 0L);
        this.assertResponseEquals(response, version, 0L, Arrays.asList(0, 1, 2));
        response = this.functionWrapper.sendRequestAndGetResponse(version, 4L);
        this.assertResponseEquals(response, version, 0L, Arrays.asList(4, 5));
        response = this.functionWrapper.sendRequestAndGetResponse(version, 4L);
        this.assertResponseEquals(response, version, 0L, Arrays.asList(4, 5));
        this.functionWrapper.closeFunctionNormally();
    }

    @Test
    void testInvalidToken() throws Exception {
        this.functionWrapper.openFunction();
        for (int i = 0; i < 6; ++i) {
            this.functionWrapper.invoke(i);
        }
        String version = this.initializeVersion();
        this.functionWrapper.sendRequestAndGetResponse(version, 4L);
        CollectCoordinationResponse response = this.functionWrapper.sendRequestAndGetResponse(version, 3L);
        this.assertResponseEquals(response, version, 0L, Collections.emptyList());
        this.functionWrapper.closeFunctionNormally();
    }

    @Test
    void testInvalidVersion() throws Exception {
        this.functionWrapper.openFunction();
        for (int i = 0; i < 6; ++i) {
            this.functionWrapper.invoke(i);
        }
        String version = this.initializeVersion();
        CollectCoordinationResponse response = this.functionWrapper.sendRequestAndGetResponse("invalid version", 0L);
        this.assertResponseEquals(response, version, 0L, Collections.emptyList());
        this.functionWrapper.closeFunctionNormally();
    }

    @Test
    void testConfiguredPortIsUsed() throws Exception {
        try (ServerSocket socket = new ServerSocket(0);){
            this.functionWrapper.getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration().set(TaskManagerOptions.COLLECT_PORT, (Object)socket.getLocalPort());
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.functionWrapper.openFunction()).isInstanceOf(BindException.class)).hasMessageContaining("Address already in use");
        }
    }

    @Test
    void testCheckpoint() throws Exception {
        this.functionWrapper.openFunctionWithState();
        for (int i = 0; i < 2; ++i) {
            this.functionWrapper.invoke(i);
        }
        String version = this.initializeVersion();
        CollectCoordinationResponse response = this.functionWrapper.sendRequestAndGetResponse(version, 0L);
        this.assertResponseEquals(response, version, 0L, Arrays.asList(0, 1));
        for (int i = 2; i < 6; ++i) {
            this.functionWrapper.invoke(i);
        }
        response = this.functionWrapper.sendRequestAndGetResponse(version, 3L);
        this.assertResponseEquals(response, version, 0L, Arrays.asList(3, 4, 5));
        this.functionWrapper.checkpointFunction(1L);
        response = this.functionWrapper.sendRequestAndGetResponse(version, 4L);
        this.assertResponseEquals(response, version, 0L, Arrays.asList(4, 5));
        this.functionWrapper.checkpointComplete(1L);
        response = this.functionWrapper.sendRequestAndGetResponse(version, 4L);
        this.assertResponseEquals(response, version, 3L, Arrays.asList(4, 5));
        this.functionWrapper.closeFunctionNormally();
    }

    @Test
    void testRestart() throws Exception {
        int i;
        this.functionWrapper.openFunctionWithState();
        for (int i2 = 0; i2 < 3; ++i2) {
            this.functionWrapper.invoke(i2);
        }
        String version = this.initializeVersion();
        this.functionWrapper.sendRequestAndGetResponse(version, 1L);
        this.functionWrapper.checkpointFunction(1L);
        this.functionWrapper.checkpointComplete(1L);
        CollectCoordinationResponse response = this.functionWrapper.sendRequestAndGetResponse(version, 1L);
        this.assertResponseEquals(response, version, 1L, Arrays.asList(1, 2));
        for (i = 3; i < 6; ++i) {
            this.functionWrapper.invoke(i);
        }
        response = this.functionWrapper.sendRequestAndGetResponse(version, 2L);
        this.assertResponseEquals(response, version, 1L, Arrays.asList(2, 3, 4));
        this.functionWrapper.closeFunctionAbnormally();
        this.functionWrapper.openFunctionWithState();
        version = this.initializeVersion();
        response = this.functionWrapper.sendRequestAndGetResponse(version, 1L);
        this.assertResponseEquals(response, version, 1L, Arrays.asList(1, 2));
        for (i = 6; i < 9; ++i) {
            this.functionWrapper.invoke(i);
        }
        response = this.functionWrapper.sendRequestAndGetResponse(version, 2L);
        this.assertResponseEquals(response, version, 1L, Arrays.asList(2, 6, 7));
        this.functionWrapper.closeFunctionNormally();
    }

    @Test
    void testAccumulatorResultWithoutCheckpoint() throws Exception {
        this.testAccumulatorResultWithoutCheckpoint(2, Arrays.asList(2, 3, 4, 5));
    }

    @Test
    void testEmptyAccumulatorResult() throws Exception {
        this.testAccumulatorResultWithoutCheckpoint(6, Collections.emptyList());
    }

    private void testAccumulatorResultWithoutCheckpoint(int offset, List<Integer> expected) throws Exception {
        this.functionWrapper.openFunction();
        for (int i = 0; i < 6; ++i) {
            this.functionWrapper.invoke(i);
        }
        String version = this.initializeVersion();
        this.functionWrapper.sendRequestAndGetResponse(version, offset);
        this.functionWrapper.closeFunctionNormally();
        CollectTestUtils.assertAccumulatorResult(this.functionWrapper.getAccumulatorResults(), (long)offset, (String)version, (long)0L, expected, serializer);
    }

    @Test
    void testAccumulatorResultWithCheckpoint() throws Exception {
        this.functionWrapper.openFunctionWithState();
        for (int i = 0; i < 6; ++i) {
            this.functionWrapper.invoke(i);
        }
        String version = this.initializeVersion();
        this.functionWrapper.sendRequestAndGetResponse(version, 3L);
        this.functionWrapper.checkpointFunction(1L);
        this.functionWrapper.checkpointComplete(1L);
        for (int i = 6; i < 9; ++i) {
            this.functionWrapper.invoke(i);
        }
        this.functionWrapper.sendRequestAndGetResponse(version, 5L);
        this.functionWrapper.closeFunctionNormally();
        CollectTestUtils.assertAccumulatorResult(this.functionWrapper.getAccumulatorResults(), (long)5L, (String)version, (long)3L, Arrays.asList(5, 6, 7, 8), serializer);
    }

    private String initializeVersion() throws Exception {
        CollectCoordinationResponse response = this.functionWrapper.sendRequestAndGetResponse("", 0L);
        return response.getVersion();
    }

    private void assertResponseEquals(CollectCoordinationResponse response, String version, long lastCheckpointedOffset, List<Integer> expected) throws IOException {
        CollectTestUtils.assertResponseEquals((CollectCoordinationResponse)response, (String)version, (long)lastCheckpointedOffset, expected, serializer);
    }
}

