package com.google.api.gax.grpc;

import com.google.api.core.SettableApiFuture;
import com.google.api.gax.grpc.testing.FakeServiceGrpc;
import com.google.api.gax.grpc.testing.FakeServiceImpl;
import com.google.api.gax.grpc.testing.InProcessServer;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StateCheckingResponseObserver;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.testing.FakeCallContext;
import com.google.common.collect.Lists;
import com.google.common.truth.Truth;
import com.google.type.Color;
import com.google.type.Money;
import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import java.io.IOException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:com/google/api/gax/grpc/GrpcDirectServerStreamingCallableTest.class */
public class GrpcDirectServerStreamingCallableTest {
    private static final Color DEFAULT_REQUEST = Color.newBuilder().setRed(0.5f).build();
    private static final Color ASYNC_REQUEST = DEFAULT_REQUEST.toBuilder().setGreen(1000.0f).build();
    private static final Color ERROR_REQUEST = Color.newBuilder().setRed(-1.0f).build();
    private static final Money DEFAULT_RESPONSE = Money.newBuilder().setCurrencyCode("USD").setUnits(127).build();
    private InProcessServer<FakeServiceImpl> inprocessServer;
    private ManagedChannel channel;
    private ClientContext clientContext;
    private ServerStreamingCallSettings<Color, Money> streamingCallSettings;
    private ServerStreamingCallable<Color, Money> streamingCallable;

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    /* loaded from: input_file:com/google/api/gax/grpc/GrpcDirectServerStreamingCallableTest$MoneyObserver.class */
    static class MoneyObserver extends StateCheckingResponseObserver<Money> {
        private final boolean autoFlowControl;
        private final CountDownLatch latch;
        volatile StreamController controller;
        volatile Money response;
        volatile Throwable error;
        volatile boolean completed;

        /* JADX INFO: Access modifiers changed from: package-private */
        public MoneyObserver(boolean z, CountDownLatch countDownLatch) {
            this.autoFlowControl = z;
            this.latch = countDownLatch;
        }

        protected void onStartImpl(StreamController streamController) {
            this.controller = streamController;
            if (this.autoFlowControl) {
                return;
            }
            streamController.disableAutoInboundFlowControl();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void onResponseImpl(Money money) {
            this.response = money;
            this.latch.countDown();
        }

        protected void onErrorImpl(Throwable th) {
            this.error = th;
            this.latch.countDown();
        }

        protected void onCompleteImpl() {
            this.completed = true;
            this.latch.countDown();
        }
    }

    @Before
    public void setUp() throws InstantiationException, IllegalAccessException, IOException {
        this.inprocessServer = new InProcessServer<>(new FakeServiceImpl(), "fakeservice");
        this.inprocessServer.start();
        this.channel = InProcessChannelBuilder.forName("fakeservice").directExecutor().usePlaintext().build();
        this.clientContext = ClientContext.newBuilder().setTransportChannel(GrpcTransportChannel.create(this.channel)).setDefaultCallContext(GrpcCallContext.of(this.channel, CallOptions.DEFAULT)).build();
        this.streamingCallSettings = ServerStreamingCallSettings.newBuilder().build();
        this.streamingCallable = GrpcCallableFactory.createServerStreamingCallable(GrpcCallSettings.create(FakeServiceGrpc.METHOD_SERVER_STREAMING_RECOGNIZE), this.streamingCallSettings, this.clientContext);
    }

    @After
    public void tearDown() {
        this.channel.shutdown();
        this.inprocessServer.stop();
    }

    @Test
    public void testBadContext() {
        this.streamingCallable = GrpcCallableFactory.createServerStreamingCallable(GrpcCallSettings.create(FakeServiceGrpc.METHOD_SERVER_STREAMING_RECOGNIZE), this.streamingCallSettings, this.clientContext.toBuilder().setDefaultCallContext(FakeCallContext.createDefault()).build());
        MoneyObserver moneyObserver = new MoneyObserver(true, new CountDownLatch(1));
        this.thrown.expect(IllegalArgumentException.class);
        this.streamingCallable.call(DEFAULT_REQUEST, moneyObserver);
    }

    @Test
    public void testServerStreamingStart() throws Exception {
        MoneyObserver moneyObserver = new MoneyObserver(true, new CountDownLatch(1));
        this.streamingCallable.call(DEFAULT_REQUEST, moneyObserver);
        Truth.assertThat(moneyObserver.controller).isNotNull();
    }

    @Test
    public void testServerStreaming() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        MoneyObserver moneyObserver = new MoneyObserver(true, countDownLatch);
        this.streamingCallable.call(DEFAULT_REQUEST, moneyObserver);
        countDownLatch.await(20L, TimeUnit.SECONDS);
        Truth.assertThat(moneyObserver.error).isNull();
        Truth.assertThat(moneyObserver.response).isEqualTo(DEFAULT_RESPONSE);
    }

    @Test
    public void testManualFlowControl() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        MoneyObserver moneyObserver = new MoneyObserver(false, countDownLatch);
        this.streamingCallable.call(DEFAULT_REQUEST, moneyObserver);
        countDownLatch.await(500L, TimeUnit.MILLISECONDS);
        Truth.assertWithMessage("Received response before requesting it").that(moneyObserver.response).isNull();
        moneyObserver.controller.request(1);
        countDownLatch.await(500L, TimeUnit.MILLISECONDS);
        Truth.assertThat(moneyObserver.response).isEqualTo(DEFAULT_RESPONSE);
        Truth.assertThat(Boolean.valueOf(moneyObserver.completed)).isTrue();
    }

    @Test
    public void testCancelClientCall() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        MoneyObserver moneyObserver = new MoneyObserver(false, countDownLatch);
        this.streamingCallable.call(ASYNC_REQUEST, moneyObserver);
        moneyObserver.controller.cancel();
        moneyObserver.controller.request(1);
        countDownLatch.await(500L, TimeUnit.MILLISECONDS);
        Truth.assertThat(moneyObserver.error).isInstanceOf(CancellationException.class);
        Truth.assertThat(moneyObserver.error).hasMessageThat().isEqualTo("User cancelled stream");
    }

    @Test
    public void testOnResponseError() throws Throwable {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        MoneyObserver moneyObserver = new MoneyObserver(true, countDownLatch);
        this.streamingCallable.call(ERROR_REQUEST, moneyObserver);
        countDownLatch.await(500L, TimeUnit.MILLISECONDS);
        Truth.assertThat(moneyObserver.error).isInstanceOf(ApiException.class);
        Truth.assertThat(moneyObserver.error.getStatusCode().getCode()).isEqualTo(StatusCode.Code.INVALID_ARGUMENT);
        Truth.assertThat(moneyObserver.error).hasMessageThat().isEqualTo("io.grpc.StatusRuntimeException: INVALID_ARGUMENT: red must be positive");
    }

    @Test
    public void testObserverErrorCancelsCall() throws Throwable {
        final RuntimeException runtimeException = new RuntimeException("some error");
        final SettableApiFuture create = SettableApiFuture.create();
        this.streamingCallable.call(DEFAULT_REQUEST, new StateCheckingResponseObserver<Money>() { // from class: com.google.api.gax.grpc.GrpcDirectServerStreamingCallableTest.1
            protected void onStartImpl(StreamController streamController) {
            }

            /* JADX INFO: Access modifiers changed from: protected */
            public void onResponseImpl(Money money) {
                throw runtimeException;
            }

            protected void onErrorImpl(Throwable th) {
                create.set(th);
            }

            protected void onCompleteImpl() {
                create.set((Object) null);
            }
        });
        ApiException apiException = (Throwable) create.get(500L, TimeUnit.MILLISECONDS);
        Truth.assertThat(apiException).isInstanceOf(ApiException.class);
        Truth.assertThat(apiException.getStatusCode().getCode()).isEqualTo(StatusCode.Code.CANCELLED);
        Truth.assertThat(apiException.getCause()).isInstanceOf(StatusRuntimeException.class);
        Truth.assertThat(apiException.getCause().getCause()).isSameAs(runtimeException);
    }

    @Test
    public void testBlockingServerStreaming() throws Exception {
        Truth.assertThat(Lists.newArrayList(this.streamingCallable.call(Color.newBuilder().setRed(0.5f).build()))).containsExactly(new Object[]{Money.newBuilder().setCurrencyCode("USD").setUnits(127L).build()});
    }
}
