package org.apache.cxf.jaxrs.sse;

import java.lang.annotation.Annotation;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.Logger;
import javax.servlet.AsyncContext;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.MessageBodyWriter;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseEventSink;
import org.apache.cxf.common.logging.LogUtils;

/* loaded from: input_file:org/apache/cxf/jaxrs/sse/SseEventSinkImpl.class */
public class SseEventSinkImpl implements SseEventSink {
    private static final Annotation[] EMPTY_ANNOTATIONS = new Annotation[0];
    private static final Logger LOG = LogUtils.getL7dLogger(SseEventSinkImpl.class);
    private static final int BUFFER_SIZE = 10000;
    private final AsyncContext ctx;
    private final MessageBodyWriter<OutboundSseEvent> writer;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AtomicBoolean dispatching = new AtomicBoolean(false);
    private final Queue<QueuedEvent> buffer = new ArrayBlockingQueue(10000);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cxf/jaxrs/sse/SseEventSinkImpl$QueuedEvent.class */
    public static class QueuedEvent {
        private final OutboundSseEvent event;
        private final CompletableFuture<?> completion;

        QueuedEvent(OutboundSseEvent outboundSseEvent, CompletableFuture<?> completableFuture) {
            this.event = outboundSseEvent;
            this.completion = completableFuture;
        }
    }

    public SseEventSinkImpl(MessageBodyWriter<OutboundSseEvent> messageBodyWriter, AsyncResponse asyncResponse, AsyncContext asyncContext) {
        this.writer = messageBodyWriter;
        this.ctx = asyncContext;
        if (asyncContext == null) {
            throw new IllegalStateException("Unable to retrieve the AsyncContext for this request. Is the Servlet configured properly?");
        }
        asyncContext.getResponse().setContentType("text/event-stream");
    }

    public AsyncContext getAsyncContext() {
        return this.ctx;
    }

    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            if (!awaitQueueToDrain(5, TimeUnit.SECONDS)) {
                LOG.warning("There are still SSE events the queue which may not be delivered (closing now)");
            }
            try {
                this.ctx.complete();
            } catch (IllegalStateException e) {
                LOG.warning("Failed to close the AsyncContext cleanly: " + e.getMessage());
            }
        }
    }

    private boolean awaitQueueToDrain(int i, TimeUnit timeUnit) {
        long nanos = timeUnit.toNanos(i) / 20;
        int i2 = 0;
        while (this.dispatching.get()) {
            i2++;
            if (i2 >= 20) {
                break;
            }
            LockSupport.parkNanos(nanos);
        }
        return this.buffer.isEmpty();
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    public CompletionStage<?> send(OutboundSseEvent outboundSseEvent) {
        CompletableFuture completableFuture = new CompletableFuture();
        if (this.closed.get() || this.writer == null) {
            completableFuture.completeExceptionally(new IllegalStateException("The sink is already closed, unable to queue SSE event for send"));
        } else if (!this.buffer.offer(new QueuedEvent(outboundSseEvent, completableFuture))) {
            completableFuture.completeExceptionally(new IllegalStateException("The buffer is full (10000), unable to queue SSE event for send"));
        } else if (this.dispatching.compareAndSet(false, true)) {
            this.ctx.start(this::dequeue);
        }
        return completableFuture;
    }

    private void dequeue() {
        while (true) {
            try {
                QueuedEvent poll = this.buffer.poll();
                if (poll == null) {
                    return;
                }
                OutboundSseEvent outboundSseEvent = poll.event;
                CompletableFuture completableFuture = poll.completion;
                try {
                    this.writer.writeTo(outboundSseEvent, outboundSseEvent.getClass(), outboundSseEvent.getGenericType(), EMPTY_ANNOTATIONS, outboundSseEvent.getMediaType(), (MultivaluedMap) null, this.ctx.getResponse().getOutputStream());
                    this.ctx.getResponse().flushBuffer();
                    completableFuture.complete(null);
                } catch (Exception e) {
                    completableFuture.completeExceptionally(e);
                }
            } finally {
                this.dispatching.set(false);
            }
        }
    }
}
