/*
 * Decompiled with CFR 0.152.
 */
package io.r2dbc.postgresql.client;

import io.r2dbc.postgresql.authentication.AuthenticationHandler;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.message.backend.AuthenticationMessage;
import io.r2dbc.postgresql.message.backend.AuthenticationOk;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.postgresql.message.frontend.StartupMessage;
import io.r2dbc.postgresql.util.Assert;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

public final class StartupMessageFlow {
    private StartupMessageFlow() {
    }

    public static Flux<BackendMessage> exchange(String applicationName, Function<AuthenticationMessage, AuthenticationHandler> authenticationHandlerProvider, Client client, @Nullable String database, String username, @Nullable Map<String, String> options) {
        Assert.requireNonNull(applicationName, "applicationName must not be null");
        Assert.requireNonNull(authenticationHandlerProvider, "authenticationHandlerProvider must not be null");
        Assert.requireNonNull(client, "client must not be null");
        Assert.requireNonNull(username, "username must not be null");
        Sinks.Many requests = Sinks.many().unicast().onBackpressureBuffer();
        AtomicReference<Object> authenticationHandler = new AtomicReference<Object>(null);
        return client.exchange((Publisher<FrontendMessage>)requests.asFlux().startWith((Object[])new FrontendMessage[]{new StartupMessage(applicationName, database, username, options)})).handle((message, sink) -> {
            if (message instanceof AuthenticationOk) {
                requests.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
            } else if (message instanceof AuthenticationMessage) {
                try {
                    FrontendMessage response;
                    AuthenticationMessage authenticationMessage = (AuthenticationMessage)message;
                    if (authenticationHandler.get() == null) {
                        authenticationHandler.compareAndSet(null, authenticationHandlerProvider.apply(authenticationMessage));
                    }
                    if ((response = ((AuthenticationHandler)authenticationHandler.get()).handle(authenticationMessage)) != null) {
                        requests.emitNext((Object)response, Sinks.EmitFailureHandler.FAIL_FAST);
                    }
                }
                catch (Exception e) {
                    requests.emitError((Throwable)e, Sinks.EmitFailureHandler.FAIL_FAST);
                    sink.error((Throwable)e);
                }
            } else {
                sink.next(message);
            }
        });
    }
}

