/*
 * Decompiled with CFR 0.152.
 */
package io.r2dbc.mssql.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.r2dbc.mssql.client.Client;
import io.r2dbc.mssql.client.ClientConfiguration;
import io.r2dbc.mssql.client.ConnectionContext;
import io.r2dbc.mssql.client.ConnectionState;
import io.r2dbc.mssql.client.EnvironmentChangeEvent;
import io.r2dbc.mssql.client.EnvironmentChangeListener;
import io.r2dbc.mssql.client.MessageDecoder;
import io.r2dbc.mssql.client.StreamDecoder;
import io.r2dbc.mssql.client.TdsEncoder;
import io.r2dbc.mssql.client.TransactionStatus;
import io.r2dbc.mssql.client.ssl.SslConfiguration;
import io.r2dbc.mssql.client.ssl.TdsSslHandler;
import io.r2dbc.mssql.message.ClientMessage;
import io.r2dbc.mssql.message.Message;
import io.r2dbc.mssql.message.TransactionDescriptor;
import io.r2dbc.mssql.message.header.PacketIdProvider;
import io.r2dbc.mssql.message.tds.ProtocolException;
import io.r2dbc.mssql.message.tds.Redirect;
import io.r2dbc.mssql.message.token.AbstractInfoToken;
import io.r2dbc.mssql.message.token.EnvChangeToken;
import io.r2dbc.mssql.message.token.FeatureExtAckToken;
import io.r2dbc.mssql.message.token.LoginAckToken;
import io.r2dbc.mssql.message.type.Collation;
import io.r2dbc.mssql.util.Assert;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import java.security.GeneralSecurityException;
import java.time.Duration;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.SynchronousSink;
import reactor.netty.Connection;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.SslProvider;
import reactor.netty.tcp.TcpClient;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

public final class ReactorNettyClient
implements Client {
    private static final Logger logger = Loggers.getLogger(ReactorNettyClient.class);
    private static final boolean DEBUG_ENABLED = logger.isDebugEnabled();
    private static final Supplier<MssqlConnectionClosedException> UNEXPECTED = () -> new MssqlConnectionClosedException("Connection unexpectedly closed");
    private static final Supplier<MssqlConnectionClosedException> EXPECTED = () -> new MssqlConnectionClosedException("Connection closed");
    private static final Supplier<MssqlConnectionClosedException> CLOSED = () -> new MssqlConnectionClosedException("Cannot exchange messages because the connection is closed");
    private final ConnectionContext context;
    private final ByteBufAllocator byteBufAllocator;
    private final Connection connection;
    private final TdsEncoder tdsEncoder;
    private final Consumer<EnvChangeToken> handleEnvChange;
    private final Consumer<FeatureExtAckToken> featureAckChange = token -> {
        for (FeatureExtAckToken.FeatureToken featureToken : token.getFeatureTokens()) {
            if (!(featureToken instanceof FeatureExtAckToken.ColumnEncryption)) continue;
            this.encryptionSupported = true;
        }
    };
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final EmitterProcessor<ClientMessage> requestProcessor = EmitterProcessor.create((boolean)false);
    private final FluxSink<ClientMessage> requests = this.requestProcessor.sink();
    private final EmitterProcessor<Message> responseProcessor = EmitterProcessor.create((boolean)false);
    private final TransactionListener transactionListener = new TransactionListener();
    private final CollationListener collationListener = new CollationListener();
    private final RedirectListener redirectListener = new RedirectListener();
    private final RequestQueue requestQueue;
    private ConnectionState state = ConnectionState.PRELOGIN;
    private MessageDecoder decodeFunction = ConnectionState.PRELOGIN.decoder(this);
    private boolean encryptionSupported = false;
    private volatile Optional<Collation> databaseCollation = Optional.empty();
    private Optional<String> databaseVersion = Optional.empty();
    private volatile Optional<Redirect> redirect = Optional.empty();
    private volatile TransactionDescriptor transactionDescriptor = TransactionDescriptor.empty();
    private volatile TransactionStatus transactionStatus = TransactionStatus.AUTO_COMMIT;

    private ReactorNettyClient(Connection connection, TdsEncoder tdsEncoder, ConnectionContext connectionContext) {
        Assert.requireNonNull(connection, "Connection must not be null");
        this.context = connectionContext;
        StreamDecoder decoder = new StreamDecoder();
        this.handleEnvChange = token -> {
            EnvironmentChangeEvent event = new EnvironmentChangeEvent((EnvChangeToken)token);
            try {
                tdsEncoder.onEnvironmentChange(event);
                this.transactionListener.onEnvironmentChange(event);
                this.collationListener.onEnvironmentChange(event);
                this.redirectListener.onEnvironmentChange(event);
            }
            catch (Exception e) {
                logger.warn(this.context.getMessage("Failed onEnvironmentChange() in {}"), new Object[]{"", e});
            }
        };
        this.byteBufAllocator = connection.outbound().alloc();
        this.connection = connection;
        this.tdsEncoder = tdsEncoder;
        this.requestQueue = new RequestQueue(this.context);
        final Consumer<Message> handleStateChange = message -> {
            ConnectionState connectionState;
            if (message.getClass() == LoginAckToken.class) {
                LoginAckToken loginAckToken = (LoginAckToken)message;
                this.databaseVersion = Optional.of(loginAckToken.getVersion().toString());
            }
            if ((connectionState = this.state).canAdvance((Message)message)) {
                ConnectionState nextState;
                this.state = nextState = connectionState.next((Message)message, connection);
                this.decodeFunction = nextState.decoder(this);
            }
        };
        SynchronousSink<Message> sink = new SynchronousSink<Message>(){

            public void complete() {
                throw new UnsupportedOperationException();
            }

            public Context currentContext() {
                return ReactorNettyClient.this.requestProcessor.currentContext();
            }

            public void error(Throwable e) {
                Object errorToUse = e;
                if (!(errorToUse instanceof R2dbcException)) {
                    errorToUse = new MssqlConnectionException((Throwable)errorToUse);
                }
                ReactorNettyClient.this.responseProcessor.onError(errorToUse);
            }

            public void next(Message message) {
                if (DEBUG_ENABLED) {
                    ReactorNettyClient.this.onInfoToken(message);
                }
                handleStateChange.accept(message);
                if (message.getClass() == EnvChangeToken.class) {
                    ReactorNettyClient.this.handleEnvChange.accept((EnvChangeToken)message);
                }
                if (message.getClass() == FeatureExtAckToken.class) {
                    ReactorNettyClient.this.featureAckChange.accept((FeatureExtAckToken)message);
                }
                ReactorNettyClient.this.responseProcessor.onNext((Object)message);
            }
        };
        connection.inbound().receiveObject().doOnNext(arg_0 -> this.lambda$new$6(decoder, (SynchronousSink)sink, arg_0)).onErrorResume(this::resumeError).subscribe((CoreSubscriber)new CoreSubscriber<Object>((SynchronousSink)sink){
            final /* synthetic */ SynchronousSink val$sink;
            {
                this.val$sink = synchronousSink;
            }

            public Context currentContext() {
                return ReactorNettyClient.this.responseProcessor.currentContext();
            }

            public void onSubscribe(Subscription s) {
                ReactorNettyClient.this.responseProcessor.onSubscribe(s);
            }

            public void onNext(Object message) {
            }

            public void onError(Throwable t) {
                this.val$sink.error(t);
            }

            public void onComplete() {
                ReactorNettyClient.this.handleClose();
            }
        });
        this.requestProcessor.concatMap(message -> {
            Object encoded;
            if (DEBUG_ENABLED) {
                logger.debug(this.context.getMessage("Request: {}"), new Object[]{message});
            }
            if ((encoded = message.encode(connection.outbound().alloc(), this.tdsEncoder.getPacketSize())) instanceof Publisher) {
                return connection.outbound().sendObject((Publisher)encoded);
            }
            return connection.outbound().sendObject(encoded);
        }).onErrorResume(this::resumeError).doAfterTerminate(this::handleClose).subscribe();
    }

    private <T> Mono<T> resumeError(Throwable throwable) {
        this.handleConnectionError(throwable);
        this.requestProcessor.onComplete();
        logger.error(this.context.getMessage("Error: {}"), new Object[]{throwable.getMessage(), throwable});
        return this.close();
    }

    private void onInfoToken(Message message) {
        logger.debug(this.context.getMessage("Response: {}"), new Object[]{message});
        if (message instanceof AbstractInfoToken) {
            AbstractInfoToken token = (AbstractInfoToken)message;
            if (token.getClassification() == AbstractInfoToken.Classification.INFORMATIONAL) {
                logger.debug(this.context.getMessage("Info: Code [{}] Severity [{}]: {}"), new Object[]{token.getNumber(), token.getClassification(), token.getMessage()});
            } else {
                logger.debug(this.context.getMessage("Warning: Code [{}] Severity [{}]: {}"), new Object[]{token.getNumber(), token.getClassification(), token.getMessage()});
            }
        }
    }

    public static Mono<ReactorNettyClient> connect(String host, int port) {
        Assert.requireNonNull(host, "host must not be null");
        return ReactorNettyClient.connect(host, port, Duration.ofSeconds(30L));
    }

    public static Mono<ReactorNettyClient> connect(final String host, final int port, final Duration connectTimeout) {
        Assert.requireNonNull(connectTimeout, "connect timeout must not be null");
        Assert.requireNonNull(host, "host must not be null");
        return ReactorNettyClient.connect(new ClientConfiguration(){

            @Override
            public String getHost() {
                return host;
            }

            @Override
            public int getPort() {
                return port;
            }

            @Override
            public Duration getConnectTimeout() {
                return connectTimeout;
            }

            @Override
            public boolean isTcpKeepAlive() {
                return false;
            }

            @Override
            public boolean isTcpNoDelay() {
                return true;
            }

            @Override
            public ConnectionProvider getConnectionProvider() {
                return ConnectionProvider.newConnection();
            }

            @Override
            public boolean isSslEnabled() {
                return false;
            }

            @Override
            public SslProvider getSslProvider() {
                return SslProvider.builder().sslContext(SslContextBuilder.forClient()).defaultConfiguration(SslProvider.DefaultConfigurationType.TCP).build();
            }
        }, null, null);
    }

    public static Mono<ReactorNettyClient> connect(ClientConfiguration configuration, @Nullable String applicationName, @Nullable UUID connectionId) {
        Assert.requireNonNull(configuration, "configuration must not be null");
        ConnectionContext connectionContext = new ConnectionContext(applicationName, connectionId);
        logger.debug(connectionContext.getMessage("connect()"));
        PacketIdProvider packetIdProvider = PacketIdProvider.atomic();
        TdsEncoder tdsEncoder = new TdsEncoder(packetIdProvider);
        Mono connection = TcpClient.create((ConnectionProvider)configuration.getConnectionProvider()).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)Math.toIntExact(configuration.getConnectTimeout().toMillis())).option(ChannelOption.SO_KEEPALIVE, (Object)configuration.isTcpKeepAlive()).option(ChannelOption.TCP_NODELAY, (Object)configuration.isTcpNoDelay()).host(configuration.getHost()).port(configuration.getPort()).connect().doOnNext(it -> {
            SslConfiguration tunnel = configuration.getSslTunnelConfiguration();
            ChannelPipeline pipeline = it.channel().pipeline();
            if (tunnel.isSslEnabled()) {
                logger.debug(connectionContext.getMessage("Enabling SSL tunnel"));
                try {
                    pipeline.addFirst("sslTunnel", (ChannelHandler)ReactorNettyClient.createSslTunnelHandler(it.channel().alloc(), tunnel));
                }
                catch (GeneralSecurityException e) {
                    it.channel().close();
                    throw new IllegalStateException("Cannot configure SSL tunnel", e);
                }
                pipeline.addAfter("sslTunnel", tdsEncoder.getClass().getName(), (ChannelHandler)tdsEncoder);
            } else {
                pipeline.addFirst(tdsEncoder.getClass().getName(), (ChannelHandler)tdsEncoder);
            }
            TdsSslHandler handler = new TdsSslHandler(packetIdProvider, configuration, connectionContext.withChannelId(it.channel().toString()));
            pipeline.addAfter(tdsEncoder.getClass().getName(), ((Object)((Object)handler)).getClass().getName(), (ChannelHandler)handler);
            InternalLogger logger = InternalLoggerFactory.getInstance(ReactorNettyClient.class);
            if (logger.isTraceEnabled()) {
                pipeline.addBefore(tdsEncoder.getClass().getName(), LoggingHandler.class.getSimpleName(), (ChannelHandler)new LoggingHandler(ReactorNettyClient.class, LogLevel.TRACE));
            }
        });
        return connection.map(it -> new ReactorNettyClient((Connection)it, tdsEncoder, connectionContext.withChannelId(it.channel().toString())));
    }

    private static SslHandler createSslTunnelHandler(ByteBufAllocator allocator, SslConfiguration tunnel) throws GeneralSecurityException {
        return new SslHandler(tunnel.getSslProvider().getSslContext().newEngine(allocator));
    }

    @Override
    public Mono<Void> close() {
        logger.debug(this.context.getMessage("close()"));
        return Mono.defer(() -> {
            logger.debug(this.context.getMessage("close(subscribed)"));
            if (this.isClosed.compareAndSet(false, true)) {
                this.connection.dispose();
                return this.connection.onDispose();
            }
            return Mono.empty();
        });
    }

    @Override
    public ByteBufAllocator getByteBufAllocator() {
        return this.byteBufAllocator;
    }

    @Override
    public ConnectionContext getContext() {
        return this.context;
    }

    @Override
    public Optional<Collation> getDatabaseCollation() {
        return this.databaseCollation;
    }

    @Override
    public Optional<String> getDatabaseVersion() {
        return this.databaseVersion;
    }

    @Override
    public Optional<Redirect> getRedirect() {
        return this.redirect;
    }

    @Override
    public TransactionDescriptor getTransactionDescriptor() {
        return this.transactionDescriptor;
    }

    @Override
    public TransactionStatus getTransactionStatus() {
        return this.transactionStatus;
    }

    @Override
    public boolean isColumnEncryptionSupported() {
        return this.encryptionSupported;
    }

    @Override
    public boolean isConnected() {
        if (this.isClosed.get()) {
            return false;
        }
        if (this.requestProcessor.isDisposed()) {
            return false;
        }
        Channel channel = this.connection.channel();
        return channel.isOpen();
    }

    @Override
    public Flux<Message> exchange(Publisher<? extends ClientMessage> requests, Predicate<Message> takeUntil) {
        Assert.requireNonNull(takeUntil, "takeUntil must not be null");
        Assert.requireNonNull(requests, "Requests must not be null");
        if (DEBUG_ENABLED) {
            logger.debug(this.context.getMessage("exchange()"));
        }
        ExchangeRequest exchangeRequest = new ExchangeRequest();
        Flux handle = Mono.create(sink -> {
            if (DEBUG_ENABLED) {
                logger.debug(this.context.getMessage("exchange(subscribed)"));
            }
            if (!this.isConnected()) {
                sink.error((Throwable)((Object)CLOSED.get()));
            }
            Flux requestMessages = this.responseProcessor.doOnSubscribe(s -> Flux.from((Publisher)requests).subscribe(t -> {
                if (!this.isConnected()) {
                    sink.error((Throwable)((Object)CLOSED.get()));
                    return;
                }
                this.requests.next(t);
            }, arg_0 -> this.requests.error(arg_0), () -> {
                if (!this.isConnected()) {
                    sink.error((Throwable)((Object)CLOSED.get()));
                }
            }));
            try {
                exchangeRequest.submit(this.requestQueue, (MonoSink<Flux<Message>>)sink, (Flux<Message>)requestMessages);
            }
            catch (Exception e) {
                sink.error((Throwable)e);
            }
        }).flatMapMany(Function.identity()).handle((message, sink) -> {
            sink.next(message);
            if (takeUntil.test((Message)message)) {
                exchangeRequest.complete();
                sink.complete();
            }
        });
        return handle.doAfterTerminate((Runnable)this.requestQueue).doOnCancel(() -> {
            if (!exchangeRequest.isComplete()) {
                logger.error("Exchange cancelled while exchange is active. This is likely a bug leading to unpredictable outcome.");
            }
        });
    }

    private void handleClose() {
        if (this.isClosed.compareAndSet(false, true)) {
            logger.warn(this.context.getMessage("Connection has been closed by peer"));
            this.drainError(UNEXPECTED);
        } else {
            this.drainError(EXPECTED);
        }
    }

    private void handleConnectionError(Throwable error) {
        this.drainError(() -> new MssqlConnectionException(error));
    }

    private void drainError(Supplier<? extends Throwable> supplier) {
        Sinkable receiver;
        while ((receiver = this.requestQueue.poll()) != null) {
            receiver.onError(supplier.get());
        }
        this.responseProcessor.onError(supplier.get());
    }

    private /* synthetic */ void lambda$new$6(StreamDecoder decoder, SynchronousSink sink, Object it) {
        if (it instanceof ByteBuf) {
            ByteBuf buffer = (ByteBuf)it;
            decoder.decode(buffer, this.decodeFunction, (SynchronousSink<Message>)sink);
            return;
        }
        if (it instanceof Message) {
            sink.next((Object)((Message)it));
            return;
        }
        throw ProtocolException.unsupported(String.format("Unexpected protocol message: [%s]", it));
    }

    static class MssqlConnectionException
    extends R2dbcNonTransientResourceException {
        public MssqlConnectionException(Throwable cause) {
            super(cause);
        }
    }

    static class MssqlConnectionClosedException
    extends R2dbcNonTransientResourceException {
        public MssqlConnectionClosedException(String reason) {
            super(reason);
        }
    }

    static interface Sinkable {
        public void onSuccess();

        public void onError(Throwable var1);
    }

    class RedirectListener
    implements EnvironmentChangeListener {
        RedirectListener() {
        }

        @Override
        public void onEnvironmentChange(EnvironmentChangeEvent event) {
            if (event.getToken().getChangeType() == EnvChangeToken.EnvChangeType.Routing) {
                Redirect redirect = Redirect.decode(Unpooled.wrappedBuffer((byte[])event.getToken().getNewValue()));
                ReactorNettyClient.this.redirect = Optional.of(redirect);
            }
        }
    }

    class CollationListener
    implements EnvironmentChangeListener {
        CollationListener() {
        }

        @Override
        public void onEnvironmentChange(EnvironmentChangeEvent event) {
            if (event.getToken().getChangeType() == EnvChangeToken.EnvChangeType.SQLCollation) {
                Collation collation = Collation.decode(Unpooled.wrappedBuffer((byte[])event.getToken().getNewValue()));
                ReactorNettyClient.this.databaseCollation = Optional.of(collation);
            }
        }
    }

    class TransactionListener
    implements EnvironmentChangeListener {
        TransactionListener() {
        }

        @Override
        public void onEnvironmentChange(EnvironmentChangeEvent event) {
            EnvChangeToken token = event.getToken();
            if (token.getChangeType() == EnvChangeToken.EnvChangeType.BeginTx || token.getChangeType() == EnvChangeToken.EnvChangeType.EnlistDTC) {
                byte[] descriptor = token.getNewValue();
                if (descriptor.length != 8) {
                    throw ProtocolException.invalidTds("Transaction descriptor length mismatch");
                }
                if (DEBUG_ENABLED) {
                    String op = token.getChangeType() == EnvChangeToken.EnvChangeType.BeginTx ? "started" : "enlisted";
                    logger.debug(String.format(ReactorNettyClient.this.context.getMessage("Transaction %s"), op));
                }
                this.updateStatus(TransactionStatus.STARTED, TransactionDescriptor.from(descriptor));
            }
            if (token.getChangeType() == EnvChangeToken.EnvChangeType.CommitTx) {
                if (DEBUG_ENABLED) {
                    logger.debug(ReactorNettyClient.this.context.getMessage("Transaction committed"));
                }
                this.updateStatus(TransactionStatus.EXPLICIT, TransactionDescriptor.empty());
            }
            if (token.getChangeType() == EnvChangeToken.EnvChangeType.RollbackTx) {
                if (DEBUG_ENABLED) {
                    logger.debug(ReactorNettyClient.this.context.getMessage("Transaction rolled back"));
                }
                this.updateStatus(TransactionStatus.EXPLICIT, TransactionDescriptor.empty());
            }
        }

        private void updateStatus(TransactionStatus status, TransactionDescriptor descriptor) {
            ReactorNettyClient.this.transactionStatus = status;
            ReactorNettyClient.this.transactionDescriptor = descriptor;
        }
    }

    static class ExchangeRequest {
        private static final AtomicIntegerFieldUpdater<ExchangeRequest> COMPLETED = AtomicIntegerFieldUpdater.newUpdater(ExchangeRequest.class, "completed");
        private static final AtomicIntegerFieldUpdater<ExchangeRequest> SUBMITTED = AtomicIntegerFieldUpdater.newUpdater(ExchangeRequest.class, "submitted");
        private volatile int completed = 0;
        private volatile int submitted = 0;

        ExchangeRequest() {
        }

        public void complete() {
            COMPLETED.set(this, 1);
        }

        public boolean isComplete() {
            return COMPLETED.get(this) == 1;
        }

        void submit(RequestQueue queue, final MonoSink<Flux<Message>> sink, final Flux<Message> requestMessages) {
            if (!SUBMITTED.compareAndSet(this, 0, 1)) {
                throw new IllegalStateException("Client exchange can be subscribed only once");
            }
            queue.submit(new Sinkable(){

                @Override
                public void onSuccess() {
                    sink.success((Object)requestMessages);
                }

                @Override
                public void onError(Throwable throwable) {
                    sink.error(throwable);
                }
            });
        }
    }

    static class RequestQueue
    implements Runnable {
        private final Queue<Sinkable> requestQueue = (Queue)Queues.small().get();
        private final AtomicBoolean active = new AtomicBoolean();
        private final ConnectionContext context;

        RequestQueue(ConnectionContext context) {
            this.context = context;
        }

        @Nullable
        public Sinkable poll() {
            return this.requestQueue.poll();
        }

        @Override
        public void run() {
            Sinkable nextCommand = this.requestQueue.poll();
            if (nextCommand != null) {
                if (DEBUG_ENABLED) {
                    logger.debug(this.context.getMessage("Initiating queued exchange"));
                }
                nextCommand.onSuccess();
                return;
            }
            if (DEBUG_ENABLED) {
                logger.debug(this.context.getMessage("Conversation complete"));
            }
            this.active.compareAndSet(true, false);
        }

        void submit(Sinkable exchangeRequest) {
            if (this.active.compareAndSet(false, true)) {
                if (DEBUG_ENABLED) {
                    logger.debug(this.context.getMessage("Initiating exchange"));
                }
                exchangeRequest.onSuccess();
            } else {
                if (DEBUG_ENABLED) {
                    logger.debug(this.context.getMessage("Queueing exchange"));
                }
                if (!this.requestQueue.offer(exchangeRequest)) {
                    throw new IllegalStateException("Request queue is full");
                }
                this.drainRequestQueue();
            }
        }

        void drainRequestQueue() {
            if (this.active.compareAndSet(false, true)) {
                Sinkable runnable = this.requestQueue.poll();
                if (runnable != null) {
                    runnable.onSuccess();
                } else {
                    this.active.compareAndSet(true, false);
                }
            }
        }
    }
}

