package org.apache.flink.runtime.rpc;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.rpc.exceptions.RpcRuntimeException;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Fail;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/FencedRpcEndpointTest.class */
class FencedRpcEndpointTest {
    private static final Time timeout = Time.seconds(10);
    private static RpcService rpcService;

    /* loaded from: input_file:org/apache/flink/runtime/rpc/FencedRpcEndpointTest$FencedTestingEndpoint.class */
    private static class FencedTestingEndpoint extends FencedRpcEndpoint<UUID> implements FencedTestingGateway {
        private final OneShotLatch computationLatch;
        private final String value;

        protected FencedTestingEndpoint(RpcService rpcService, String str, UUID uuid) {
            super(rpcService, uuid);
            this.computationLatch = new OneShotLatch();
            this.value = str;
        }

        @Override // org.apache.flink.runtime.rpc.FencedRpcEndpointTest.FencedTestingGateway
        public CompletableFuture<String> foobar(Time time) {
            return CompletableFuture.completedFuture(this.value);
        }

        @Override // org.apache.flink.runtime.rpc.FencedRpcEndpointTest.FencedTestingGateway
        public CompletableFuture<Acknowledge> triggerComputationLatch(Time time) {
            this.computationLatch.trigger();
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/FencedRpcEndpointTest$FencedTestingGateway.class */
    public interface FencedTestingGateway extends FencedRpcGateway<UUID> {
        CompletableFuture<String> foobar(@RpcTimeout Time time);

        CompletableFuture<Acknowledge> triggerComputationLatch(@RpcTimeout Time time);
    }

    FencedRpcEndpointTest() {
    }

    @BeforeAll
    static void setup() {
        rpcService = new TestingRpcService();
    }

    @AfterAll
    static void teardown() throws ExecutionException, InterruptedException, TimeoutException {
        if (rpcService != null) {
            RpcUtils.terminateRpcService(new RpcService[]{rpcService});
        }
    }

    @Test
    void testFencing() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        RpcEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, "barfoo", randomUUID);
        try {
            fencedTestingEndpoint.start();
            FencedTestingGateway fencedTestingGateway = (FencedTestingGateway) rpcService.connect(fencedTestingEndpoint.getAddress(), randomUUID, FencedTestingGateway.class).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            FencedTestingGateway fencedTestingGateway2 = (FencedTestingGateway) rpcService.connect(fencedTestingEndpoint.getAddress(), randomUUID2, FencedTestingGateway.class).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assertions.assertThat(fencedTestingGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)).isEqualTo("barfoo");
            try {
                fencedTestingGateway2.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                Fail.fail("This should fail since we have the wrong fencing token.");
            } catch (ExecutionException e) {
                Assertions.assertThat(ExceptionUtils.stripExecutionException(e)).isInstanceOf(FencingTokenException.class);
            }
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{fencedTestingEndpoint});
            fencedTestingEndpoint.validateResourceClosed();
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{fencedTestingEndpoint});
            fencedTestingEndpoint.validateResourceClosed();
            throw th;
        }
    }

    @Test
    void testUnfencedRemoteGateway() throws Exception {
        RpcEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, "foobar", UUID.randomUUID());
        try {
            fencedTestingEndpoint.start();
            FencedTestingGateway fencedTestingGateway = (FencedTestingGateway) rpcService.connect(fencedTestingEndpoint.getAddress(), FencedTestingGateway.class).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            try {
                fencedTestingGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                Fail.fail("This should have failed because we have an unfenced gateway.");
            } catch (ExecutionException e) {
                Assertions.assertThat(ExceptionUtils.stripExecutionException(e)).isInstanceOf(RpcRuntimeException.class);
            }
            fencedTestingGateway.getClass();
            Assertions.assertThatThrownBy(fencedTestingGateway::getFencingToken).withFailMessage("We should not be able to call getFencingToken on an unfenced gateway.", new Object[0]).isInstanceOf(UnsupportedOperationException.class);
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{fencedTestingEndpoint});
            fencedTestingEndpoint.validateResourceClosed();
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{fencedTestingEndpoint});
            fencedTestingEndpoint.validateResourceClosed();
            throw th;
        }
    }
}
