package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriber;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriberImpl;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.InitialSubscribeRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SubscribeRequest;
import com.google.cloud.pubsublite.proto.SubscribeResponse;
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Monitor;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;

/* loaded from: input_file:com/google/cloud/pubsublite/internal/wire/SubscriberImpl.class */
public class SubscriberImpl extends ProxyService implements Subscriber, RetryingConnectionObserver<ConnectedSubscriber.Response> {

    @VisibleForTesting
    static final long FLOW_REQUESTS_FLUSH_INTERVAL_MS = 100;
    private final Consumer<ImmutableList<SequencedMessage>> messageConsumer;
    private final CloseableMonitor monitor;
    private final ScheduledExecutorService executorService;
    private Future<?> alarmFuture;

    @GuardedBy("monitor.monitor")
    private final RetryingConnection<ConnectedSubscriber> connection;

    @GuardedBy("monitor.monitor")
    private final NextOffsetTracker nextOffsetTracker;

    @GuardedBy("monitor.monitor")
    private final FlowControlBatcher flowControlBatcher;

    @GuardedBy("monitor.monitor")
    private Optional<InFlightSeek> inFlightSeek;

    @GuardedBy("monitor.monitor")
    private boolean internalSeekInFlight;

    @GuardedBy("monitor.monitor")
    private boolean shutdown;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/pubsublite/internal/wire/SubscriberImpl$InFlightSeek.class */
    public static class InFlightSeek {
        final SeekRequest seekRequest;
        final SettableApiFuture<Offset> seekFuture;

        InFlightSeek(SeekRequest seekRequest, SettableApiFuture<Offset> settableApiFuture) {
            this.seekRequest = seekRequest;
            this.seekFuture = settableApiFuture;
        }
    }

    @VisibleForTesting
    SubscriberImpl(StreamFactory<SubscribeRequest, SubscribeResponse> streamFactory, ConnectedSubscriberFactory connectedSubscriberFactory, InitialSubscribeRequest initialSubscribeRequest, Consumer<ImmutableList<SequencedMessage>> consumer) throws ApiException {
        this.monitor = new CloseableMonitor();
        this.nextOffsetTracker = new NextOffsetTracker();
        this.flowControlBatcher = new FlowControlBatcher();
        this.inFlightSeek = Optional.empty();
        this.internalSeekInFlight = false;
        this.shutdown = false;
        this.messageConsumer = consumer;
        this.connection = new RetryingConnectionImpl(streamFactory, connectedSubscriberFactory, SubscribeRequest.newBuilder().setInitial(initialSubscribeRequest).build(), this);
        this.executorService = Executors.newSingleThreadScheduledExecutor();
        addServices(this.connection);
    }

    public SubscriberImpl(SubscriberServiceClient subscriberServiceClient, InitialSubscribeRequest initialSubscribeRequest, Consumer<ImmutableList<SequencedMessage>> consumer) throws ApiException {
        this(responseObserver -> {
            return subscriberServiceClient.subscribeCallable().splitCall(responseObserver);
        }, new ConnectedSubscriberImpl.Factory(), initialSubscribeRequest, consumer);
        addServices(ApiServiceUtils.backgroundResourceAsApiService(subscriberServiceClient));
    }

    @Override // com.google.cloud.pubsublite.internal.ProxyService
    protected void handlePermanentError(CheckedApiException checkedApiException) {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            this.shutdown = true;
            this.inFlightSeek.ifPresent(inFlightSeek -> {
                inFlightSeek.seekFuture.setException(checkedApiException);
            });
            this.inFlightSeek = Optional.empty();
            onPermanentError(checkedApiException);
            if (enter != null) {
                if (0 == 0) {
                    enter.close();
                    return;
                }
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    enter.close();
                }
            }
            throw th3;
        }
    }

    @Override // com.google.cloud.pubsublite.internal.ProxyService
    protected void start() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            this.alarmFuture = this.executorService.scheduleWithFixedDelay(this::processBatchFlowRequest, FLOW_REQUESTS_FLUSH_INTERVAL_MS, FLOW_REQUESTS_FLUSH_INTERVAL_MS, TimeUnit.MILLISECONDS);
            if (enter != null) {
                if (0 == 0) {
                    enter.close();
                    return;
                }
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    enter.close();
                }
            }
            throw th3;
        }
    }

    @Override // com.google.cloud.pubsublite.internal.ProxyService
    protected void stop() {
        this.alarmFuture.cancel(false);
        this.executorService.shutdown();
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            this.shutdown = true;
            this.inFlightSeek.ifPresent(inFlightSeek -> {
                inFlightSeek.seekFuture.setException(new CheckedApiException("Client stopped while seek in flight.", StatusCode.Code.ABORTED));
            });
            if (enter != null) {
                if (0 == 0) {
                    enter.close();
                    return;
                }
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    enter.close();
                }
            }
            throw th3;
        }
    }

    @Override // com.google.cloud.pubsublite.internal.wire.Subscriber
    public ApiFuture<Offset> seek(SeekRequest seekRequest) {
        try {
            CloseableMonitor.Hold enterWhenUninterruptibly = this.monitor.enterWhenUninterruptibly(new Monitor.Guard(this.monitor.monitor) { // from class: com.google.cloud.pubsublite.internal.wire.SubscriberImpl.1
                public boolean isSatisfied() {
                    return !SubscriberImpl.this.internalSeekInFlight || SubscriberImpl.this.shutdown;
                }
            });
            Throwable th = null;
            try {
                CheckedApiPreconditions.checkArgument(Predicates.isValidSeekRequest(seekRequest), "Sent SeekRequest with no location set.");
                CheckedApiPreconditions.checkState(!this.shutdown, "Seeked after the stream shut down.");
                CheckedApiPreconditions.checkState(!this.inFlightSeek.isPresent(), "Seeked while seek is already in flight.");
                SettableApiFuture create = SettableApiFuture.create();
                this.inFlightSeek = Optional.of(new InFlightSeek(seekRequest, create));
                this.flowControlBatcher.onClientSeek();
                this.connection.modifyConnection(optional -> {
                    optional.ifPresent(connectedSubscriber -> {
                        connectedSubscriber.seek(seekRequest);
                    });
                });
                if (enterWhenUninterruptibly != null) {
                    if (0 != 0) {
                        try {
                            enterWhenUninterruptibly.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        enterWhenUninterruptibly.close();
                    }
                }
                return create;
            } finally {
            }
        } catch (CheckedApiException e) {
            onPermanentError(e);
            return ApiFutures.immediateFailedFuture(e);
        }
    }

    @Override // com.google.cloud.pubsublite.internal.wire.Subscriber
    public boolean seekInFlight() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            boolean isPresent = this.inFlightSeek.isPresent();
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    enter.close();
                }
            }
            return isPresent;
        } catch (Throwable th3) {
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    enter.close();
                }
            }
            throw th3;
        }
    }

    @Override // com.google.cloud.pubsublite.internal.wire.Subscriber
    public void allowFlow(FlowControlRequest flowControlRequest) throws CheckedApiException {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            try {
                if (this.shutdown) {
                    if (enter != null) {
                        if (0 == 0) {
                            enter.close();
                            return;
                        }
                        try {
                            enter.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                this.flowControlBatcher.onClientFlowRequest(flowControlRequest);
                if (this.flowControlBatcher.shouldExpediteBatchRequest()) {
                    this.connection.modifyConnection(optional -> {
                        optional.ifPresent(connectedSubscriber -> {
                            flushBatchFlowRequest(connectedSubscriber);
                        });
                    });
                }
                if (enter != null) {
                    if (0 == 0) {
                        enter.close();
                        return;
                    }
                    try {
                        enter.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (enter != null) {
                if (th != null) {
                    try {
                        enter.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    enter.close();
                }
            }
            throw th5;
        }
    }

    @Override // com.google.cloud.pubsublite.internal.wire.RetryingConnectionObserver
    public void triggerReinitialize() {
        try {
            CloseableMonitor.Hold enter = this.monitor.enter();
            Throwable th = null;
            try {
                if (!this.shutdown) {
                    this.connection.reinitialize();
                    this.connection.modifyConnection(optional -> {
                        CheckedApiPreconditions.checkArgument(this.monitor.monitor.isOccupiedByCurrentThread());
                        CheckedApiPreconditions.checkArgument(optional.isPresent());
                        if (this.inFlightSeek.isPresent()) {
                            ((ConnectedSubscriber) optional.get()).seek(this.inFlightSeek.get().seekRequest);
                        } else {
                            this.nextOffsetTracker.requestForRestart().ifPresent(seekRequest -> {
                                this.internalSeekInFlight = true;
                                ((ConnectedSubscriber) optional.get()).seek(seekRequest);
                            });
                        }
                        this.flowControlBatcher.requestForRestart().ifPresent(flowControlRequest -> {
                            ((ConnectedSubscriber) optional.get()).allowFlow(flowControlRequest);
                        });
                    });
                    if (enter != null) {
                        if (0 != 0) {
                            try {
                                enter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            enter.close();
                        }
                    }
                    return;
                }
                if (enter != null) {
                    if (0 == 0) {
                        enter.close();
                        return;
                    }
                    try {
                        enter.close();
                        return;
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                        return;
                    }
                }
                return;
            } finally {
            }
        } catch (CheckedApiException e) {
            onPermanentError(e);
        }
        onPermanentError(e);
    }

    @Override // com.google.cloud.pubsublite.internal.wire.RetryingConnectionObserver
    public void onClientResponse(ConnectedSubscriber.Response response) throws CheckedApiException {
        switch (response.getKind()) {
            case MESSAGES:
                onMessageResponse(response.messages());
                return;
            case SEEK_OFFSET:
                onSeekResponse(response.seekOffset());
                return;
            default:
                throw new CheckedApiException("Invalid switch case: " + response.getKind(), StatusCode.Code.FAILED_PRECONDITION);
        }
    }

    private void onMessageResponse(ImmutableList<SequencedMessage> immutableList) throws CheckedApiException {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            try {
                if (this.shutdown) {
                    if (enter != null) {
                        if (0 == 0) {
                            enter.close();
                            return;
                        }
                        try {
                            enter.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                this.nextOffsetTracker.onMessages(immutableList);
                this.flowControlBatcher.onMessages(immutableList);
                if (enter != null) {
                    if (0 != 0) {
                        try {
                            enter.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        enter.close();
                    }
                }
                this.messageConsumer.accept(immutableList);
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (enter != null) {
                if (th != null) {
                    try {
                        enter.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    enter.close();
                }
            }
            throw th5;
        }
    }

    private void onSeekResponse(Offset offset) throws CheckedApiException {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            if (this.shutdown) {
                if (enter != null) {
                    if (0 == 0) {
                        enter.close();
                        return;
                    }
                    try {
                        enter.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            if (this.internalSeekInFlight) {
                this.internalSeekInFlight = false;
                if (enter != null) {
                    if (0 == 0) {
                        enter.close();
                        return;
                    }
                    try {
                        enter.close();
                        return;
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                        return;
                    }
                }
                return;
            }
            CheckedApiPreconditions.checkState(this.inFlightSeek.isPresent(), "No in flight seek, but received a seek response.");
            this.nextOffsetTracker.onClientSeek(offset);
            this.inFlightSeek.get().seekFuture.set(offset);
            this.inFlightSeek = Optional.empty();
            if (enter != null) {
                if (0 == 0) {
                    enter.close();
                    return;
                }
                try {
                    enter.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            }
        } catch (Throwable th5) {
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    enter.close();
                }
            }
            throw th5;
        }
    }

    @VisibleForTesting
    void processBatchFlowRequest() {
        try {
            CloseableMonitor.Hold enter = this.monitor.enter();
            Throwable th = null;
            try {
                if (!this.shutdown) {
                    this.connection.modifyConnection(optional -> {
                        optional.ifPresent(connectedSubscriber -> {
                            flushBatchFlowRequest(connectedSubscriber);
                        });
                    });
                    if (enter != null) {
                        if (0 != 0) {
                            try {
                                enter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            enter.close();
                        }
                    }
                    return;
                }
                if (enter != null) {
                    if (0 == 0) {
                        enter.close();
                        return;
                    }
                    try {
                        enter.close();
                        return;
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                        return;
                    }
                }
                return;
            } catch (Throwable th4) {
                if (enter != null) {
                    if (0 != 0) {
                        try {
                            enter.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        enter.close();
                    }
                }
                throw th4;
            }
        } catch (CheckedApiException e) {
            onPermanentError(e);
        }
        onPermanentError(e);
    }

    private void flushBatchFlowRequest(ConnectedSubscriber connectedSubscriber) {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            try {
                this.flowControlBatcher.releasePendingRequest().ifPresent(flowControlRequest -> {
                    connectedSubscriber.allowFlow(flowControlRequest);
                });
                if (enter != null) {
                    if (0 == 0) {
                        enter.close();
                        return;
                    }
                    try {
                        enter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (enter != null) {
                if (th != null) {
                    try {
                        enter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    enter.close();
                }
            }
            throw th4;
        }
    }
}
