/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.grpcio.common.impl.stub;

import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Handler;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.concurrent.InboundMessageQueue;
import io.vertx.core.streams.ReadStream;

class StreamObserverReadStream<T>
implements StreamObserver<T>,
ReadStream<T> {
    private static final EndOfStream END_SENTINEL = new EndOfStream(null);
    private final CallStreamObserver<?> streamObserver;
    private final InboundMessageQueue<Object> queue;
    private Handler<Throwable> exceptionHandler;
    private Handler<T> handler;
    private Handler<Void> endHandler;
    private boolean paused;

    public StreamObserverReadStream(ContextInternal context, final CallStreamObserver<?> streamObserver) {
        this.streamObserver = streamObserver;
        this.paused = false;
        this.queue = new InboundMessageQueue<Object>(context.executor(), context.executor()){

            protected void handleMessage(Object msg) {
                Object h;
                if (msg instanceof EndOfStream) {
                    Throwable failure = ((EndOfStream)msg).failure;
                    if (failure != null) {
                        h = StreamObserverReadStream.this.exceptionHandler;
                        msg = failure;
                    } else {
                        h = StreamObserverReadStream.this.endHandler;
                        msg = null;
                    }
                } else {
                    h = StreamObserverReadStream.this.handler;
                }
                if (h != null) {
                    h.handle(msg);
                }
            }

            protected void handleResume() {
                StreamObserverReadStream.this.paused = false;
                streamObserver.request(1);
            }

            protected void handlePause() {
                StreamObserverReadStream.this.paused = true;
            }
        };
    }

    public void init() {
        this.streamObserver.disableAutoInboundFlowControl();
        this.streamObserver.request(1);
    }

    public void onNext(T t) {
        this.queue.write(t);
        if (!this.paused) {
            this.streamObserver.request(1);
        }
    }

    public void onError(Throwable throwable) {
        this.queue.write((Object)new EndOfStream(throwable));
    }

    public void onCompleted() {
        this.queue.write((Object)END_SENTINEL);
    }

    public synchronized ReadStream<T> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    public synchronized ReadStream<T> handler(@Nullable Handler<T> handler) {
        this.handler = handler;
        return this;
    }

    public synchronized ReadStream<T> endHandler(@Nullable Handler<Void> handler) {
        this.endHandler = handler;
        return this;
    }

    public ReadStream<T> pause() {
        this.queue.pause();
        return this;
    }

    public ReadStream<T> resume() {
        return this.fetch(Long.MAX_VALUE);
    }

    public ReadStream<T> fetch(long amount) {
        this.queue.fetch(amount);
        return this;
    }

    private static class EndOfStream {
        final Throwable failure;

        EndOfStream(Throwable failure) {
            this.failure = failure;
        }
    }
}

