/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.data.connection.support;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.propagation.ReactorPropagation;
import io.micronaut.core.propagation.PropagatedContextElement;
import io.micronaut.data.connection.ConnectionDefinition;
import io.micronaut.data.connection.ConnectionStatus;
import io.micronaut.data.connection.exceptions.NoConnectionException;
import io.micronaut.data.connection.reactive.DefaultReactiveConnectionStatus;
import io.micronaut.data.connection.reactive.ReactiveStreamsConnectionOperations;
import io.micronaut.data.connection.reactive.ReactorConnectionOperations;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

@Internal
public abstract class AbstractReactorConnectionOperations<C>
implements ReactorConnectionOperations<C> {
    @NonNull
    protected abstract Publisher<C> openConnection(@NonNull ConnectionDefinition var1);

    @NonNull
    protected abstract Publisher<Void> closeConnection(@NonNull C var1, @NonNull ConnectionDefinition var2);

    @Override
    public final Optional<ConnectionStatus<C>> findConnectionStatus(@NonNull ContextView contextView) {
        return this.findPropagateContextElement(contextView).map(e -> e.status);
    }

    private Optional<ClientSessionPropagatedContext> findPropagateContextElement(ContextView contextView) {
        return ReactorPropagation.findAllContextElements((ContextView)contextView, ClientSessionPropagatedContext.class).filter(e -> e.connectionOperations == this).findFirst();
    }

    @Override
    @NonNull
    public <T> Flux<T> withConnectionFlux(@NonNull ConnectionDefinition definition, @NonNull Function<ConnectionStatus<C>, Flux<T>> callback) {
        Objects.requireNonNull(callback, "Callback cannot be null");
        return Flux.deferContextual(contextView -> {
            C connection = this.findConnection((ContextView)contextView);
            if (connection != null) {
                return switch (definition.getPropagationBehavior()) {
                    default -> throw new IncompatibleClassChangeError();
                    case ConnectionDefinition.Propagation.REQUIRED, ConnectionDefinition.Propagation.MANDATORY -> this.existingConnectionFlux(definition, callback, connection);
                    case ConnectionDefinition.Propagation.REQUIRES_NEW -> this.openConnectionFlux(definition, callback);
                };
            }
            switch (definition.getPropagationBehavior()) {
                default: {
                    throw new IncompatibleClassChangeError();
                }
                case REQUIRED: 
                case REQUIRES_NEW: {
                    break;
                }
                case MANDATORY: {
                    throw this.noConnectionFound();
                }
            }
            return this.openConnectionFlux(definition, callback);
        });
    }

    private <T> Flux<T> existingConnectionFlux(ConnectionDefinition definition, Function<ConnectionStatus<C>, Flux<T>> callback, C clientSession) {
        return this.applyCallbackFlux(callback, new DefaultReactiveConnectionStatus<C>(clientSession, definition, false));
    }

    private <T> Flux<T> openConnectionFlux(ConnectionDefinition definition, Function<ConnectionStatus<C>, Flux<T>> callback) {
        return Flux.usingWhen((Publisher)Mono.from(this.openConnection(definition)).map(connection -> new DefaultReactiveConnectionStatus<Object>(connection, definition, true)), connectionStatus -> this.applyCallbackFlux(callback, (DefaultReactiveConnectionStatus<C>)connectionStatus).contextWrite(ctx -> this.addClientSession((Context)ctx, (ConnectionStatus<C>)connectionStatus)), connectionStatus -> connectionStatus.onComplete(() -> this.closeConnection(connectionStatus.getConnection(), definition)), (connectionStatus, throwable) -> connectionStatus.onError((Throwable)throwable, () -> this.closeConnection(connectionStatus.getConnection(), definition)), connectionStatus -> connectionStatus.onCancel(() -> this.closeConnection(connectionStatus.getConnection(), definition)));
    }

    @Override
    @NonNull
    public <T> Mono<T> withConnectionMono(@NonNull ConnectionDefinition definition, @NonNull Function<ConnectionStatus<C>, Mono<T>> callback) {
        Objects.requireNonNull(callback, "Callback cannot be null");
        return Mono.deferContextual(contextView -> {
            C connection = this.findConnection((ContextView)contextView);
            if (connection != null) {
                return switch (definition.getPropagationBehavior()) {
                    default -> throw new IncompatibleClassChangeError();
                    case ConnectionDefinition.Propagation.REQUIRED, ConnectionDefinition.Propagation.MANDATORY -> this.existingConnectionMono(definition, callback, connection);
                    case ConnectionDefinition.Propagation.REQUIRES_NEW -> this.openConnectionMono(definition, callback);
                };
            }
            switch (definition.getPropagationBehavior()) {
                default: {
                    throw new IncompatibleClassChangeError();
                }
                case REQUIRED: 
                case REQUIRES_NEW: {
                    break;
                }
                case MANDATORY: {
                    throw this.noConnectionFound();
                }
            }
            return this.openConnectionMono(definition, callback);
        });
    }

    private <T> Mono<T> existingConnectionMono(ConnectionDefinition definition, Function<ConnectionStatus<C>, Mono<T>> callback, C clientSession) {
        return this.applyCallbackMono(callback, new DefaultReactiveConnectionStatus<C>(clientSession, definition, false));
    }

    private <T> Mono<T> openConnectionMono(ConnectionDefinition definition, Function<ConnectionStatus<C>, Mono<T>> callback) {
        return Mono.usingWhen((Publisher)Mono.from(this.openConnection(definition)).map(connection -> new DefaultReactiveConnectionStatus<Object>(connection, definition, true)), connectionStatus -> this.applyCallbackMono(callback, (DefaultReactiveConnectionStatus<C>)connectionStatus).contextWrite(ctx -> this.addClientSession((Context)ctx, (ConnectionStatus<C>)connectionStatus)), connectionStatus -> connectionStatus.onComplete(() -> this.closeConnection(connectionStatus.getConnection(), definition)), (connectionStatus, throwable) -> connectionStatus.onError((Throwable)throwable, () -> this.closeConnection(connectionStatus.getConnection(), definition)), connectionStatus -> connectionStatus.onCancel(() -> this.closeConnection(connectionStatus.getConnection(), definition)));
    }

    private NoConnectionException noConnectionFound() {
        return new NoConnectionException("No existing connection found for connection marked with propagation 'mandatory'");
    }

    @NonNull
    private Context addClientSession(@NonNull Context context, @NonNull ConnectionStatus<C> status) {
        return ReactorPropagation.addContextElement((Context)context, new ClientSessionPropagatedContext<C>(this, status));
    }

    @Nullable
    private C findConnection(@NonNull ContextView contextView) {
        return this.findConnectionStatus(contextView).map(ConnectionStatus::getConnection).orElse(null);
    }

    private <T> Flux<T> applyCallbackFlux(Function<ConnectionStatus<C>, Flux<T>> callback, DefaultReactiveConnectionStatus<C> connectionStatus) {
        try {
            return callback.apply(connectionStatus);
        }
        catch (Exception e) {
            return Flux.error((Throwable)e);
        }
    }

    private <T> Mono<T> applyCallbackMono(Function<ConnectionStatus<C>, Mono<T>> callback, DefaultReactiveConnectionStatus<C> connectionStatus) {
        try {
            return callback.apply(connectionStatus);
        }
        catch (Exception e) {
            return Mono.error((Throwable)e);
        }
    }

    private record ClientSessionPropagatedContext<C>(ReactiveStreamsConnectionOperations<?> connectionOperations, ConnectionStatus<C> status) implements PropagatedContextElement
    {
    }
}

