/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket;

import io.rsocket.AbstractRSocket;
import io.rsocket.Closeable;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketClient;
import io.rsocket.RSocketServer;
import io.rsocket.SocketAcceptor;
import io.rsocket.StreamIdSupplier;
import io.rsocket.exceptions.InvalidSetupException;
import io.rsocket.exceptions.RejectedSetupException;
import io.rsocket.fragmentation.FragmentationDuplexConnection;
import io.rsocket.frame.SetupFrameFlyweight;
import io.rsocket.frame.VersionFlyweight;
import io.rsocket.internal.ClientServerInputMultiplexer;
import io.rsocket.plugins.DuplexConnectionInterceptor;
import io.rsocket.plugins.PluginRegistry;
import io.rsocket.plugins.Plugins;
import io.rsocket.plugins.RSocketInterceptor;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.ServerTransport;
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.EmptyPayload;
import java.time.Duration;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;

public class RSocketFactory {
    public static ClientRSocketFactory connect() {
        return new ClientRSocketFactory();
    }

    public static ServerRSocketFactory receive() {
        return new ServerRSocketFactory();
    }

    public static class ServerRSocketFactory {
        private SocketAcceptor acceptor;
        private Function<Frame, ? extends Payload> frameDecoder = DefaultPayload::create;
        private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
        private int mtu = 0;
        private PluginRegistry plugins = new PluginRegistry(Plugins.defaultPlugins());

        private ServerRSocketFactory() {
        }

        public ServerRSocketFactory addConnectionPlugin(DuplexConnectionInterceptor interceptor) {
            this.plugins.addConnectionPlugin(interceptor);
            return this;
        }

        public ServerRSocketFactory addClientPlugin(RSocketInterceptor interceptor) {
            this.plugins.addClientPlugin(interceptor);
            return this;
        }

        public ServerRSocketFactory addServerPlugin(RSocketInterceptor interceptor) {
            this.plugins.addServerPlugin(interceptor);
            return this;
        }

        public ServerTransportAcceptor acceptor(SocketAcceptor acceptor) {
            this.acceptor = acceptor;
            return x$0 -> new ServerStart(x$0);
        }

        public ServerRSocketFactory frameDecoder(Function<Frame, ? extends Payload> frameDecoder) {
            this.frameDecoder = frameDecoder;
            return this;
        }

        public ServerRSocketFactory fragment(int mtu) {
            this.mtu = mtu;
            return this;
        }

        public ServerRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
            this.errorConsumer = errorConsumer;
            return this;
        }

        private class ServerStart<T extends Closeable>
        implements Start<T> {
            private final Supplier<ServerTransport<T>> transportServer;

            ServerStart(Supplier<ServerTransport<T>> transportServer) {
                this.transportServer = transportServer;
            }

            @Override
            public Mono<T> start() {
                return this.transportServer.get().start(connection -> {
                    if (ServerRSocketFactory.this.mtu > 0) {
                        connection = new FragmentationDuplexConnection(connection, ServerRSocketFactory.this.mtu);
                    }
                    ClientServerInputMultiplexer multiplexer = new ClientServerInputMultiplexer(connection, ServerRSocketFactory.this.plugins);
                    return multiplexer.asStreamZeroConnection().receive().next().flatMap(setupFrame -> this.processSetupFrame(multiplexer, (Frame)setupFrame));
                });
            }

            private Mono<Void> processSetupFrame(ClientServerInputMultiplexer multiplexer, Frame setupFrame) {
                int version = Frame.Setup.version(setupFrame);
                if (version != SetupFrameFlyweight.CURRENT_VERSION) {
                    setupFrame.release();
                    InvalidSetupException error = new InvalidSetupException("Unsupported version " + VersionFlyweight.toString(version));
                    return multiplexer.asStreamZeroConnection().sendOne(Frame.Error.from(0, error)).doFinally(signalType -> multiplexer.dispose());
                }
                ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create(setupFrame);
                int keepAliveInterval = setupPayload.keepAliveInterval();
                int keepAliveMaxLifetime = setupPayload.keepAliveMaxLifetime();
                RSocketClient rSocketClient = new RSocketClient(multiplexer.asServerConnection(), ServerRSocketFactory.this.frameDecoder, ServerRSocketFactory.this.errorConsumer, StreamIdSupplier.serverSupplier());
                RSocket wrappedRSocketClient = ServerRSocketFactory.this.plugins.applyClient(rSocketClient);
                return ServerRSocketFactory.this.acceptor.accept(setupPayload, wrappedRSocketClient).onErrorResume(err -> multiplexer.asStreamZeroConnection().sendOne(this.rejectedSetupErrorFrame((Throwable)err)).then(Mono.error((Throwable)err))).doOnNext(unwrappedServerSocket -> {
                    RSocket wrappedRSocketServer = ServerRSocketFactory.this.plugins.applyServer((RSocket)unwrappedServerSocket);
                    RSocketServer rSocketServer = new RSocketServer(multiplexer.asClientConnection(), wrappedRSocketServer, ServerRSocketFactory.this.frameDecoder, ServerRSocketFactory.this.errorConsumer, keepAliveInterval, keepAliveMaxLifetime);
                }).doFinally(signalType -> setupPayload.release()).then();
            }

            private Frame rejectedSetupErrorFrame(Throwable err) {
                String msg = err.getMessage();
                return Frame.Error.from(0, new RejectedSetupException(msg == null ? "rejected by server acceptor" : msg));
            }
        }
    }

    public static class ClientRSocketFactory
    implements ClientTransportAcceptor {
        private Supplier<Function<RSocket, RSocket>> acceptor = () -> rSocket -> new AbstractRSocket(){};
        private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
        private int mtu = 0;
        private PluginRegistry plugins = new PluginRegistry(Plugins.defaultPlugins());
        private int flags = 0;
        private Payload setupPayload = EmptyPayload.INSTANCE;
        private Function<Frame, ? extends Payload> frameDecoder = DefaultPayload::create;
        private Duration tickPeriod = Duration.ofSeconds(20L);
        private Duration ackTimeout = Duration.ofSeconds(30L);
        private int missedAcks = 3;
        private String metadataMimeType = "application/binary";
        private String dataMimeType = "application/binary";

        public ClientRSocketFactory addConnectionPlugin(DuplexConnectionInterceptor interceptor) {
            this.plugins.addConnectionPlugin(interceptor);
            return this;
        }

        public ClientRSocketFactory addClientPlugin(RSocketInterceptor interceptor) {
            this.plugins.addClientPlugin(interceptor);
            return this;
        }

        public ClientRSocketFactory addServerPlugin(RSocketInterceptor interceptor) {
            this.plugins.addServerPlugin(interceptor);
            return this;
        }

        @Deprecated
        public ClientRSocketFactory keepAlive() {
            return this;
        }

        public ClientRSocketFactory keepAlive(Duration tickPeriod, Duration ackTimeout, int missedAcks) {
            this.tickPeriod = tickPeriod;
            this.ackTimeout = ackTimeout;
            this.missedAcks = missedAcks;
            return this;
        }

        public ClientRSocketFactory keepAliveTickPeriod(Duration tickPeriod) {
            this.tickPeriod = tickPeriod;
            return this;
        }

        public ClientRSocketFactory keepAliveAckTimeout(Duration ackTimeout) {
            this.ackTimeout = ackTimeout;
            return this;
        }

        public ClientRSocketFactory keepAliveMissedAcks(int missedAcks) {
            this.missedAcks = missedAcks;
            return this;
        }

        public ClientRSocketFactory mimeType(String metadataMimeType, String dataMimeType) {
            this.dataMimeType = dataMimeType;
            this.metadataMimeType = metadataMimeType;
            return this;
        }

        public ClientRSocketFactory dataMimeType(String dataMimeType) {
            this.dataMimeType = dataMimeType;
            return this;
        }

        public ClientRSocketFactory metadataMimeType(String metadataMimeType) {
            this.metadataMimeType = metadataMimeType;
            return this;
        }

        @Override
        public Start<RSocket> transport(Supplier<ClientTransport> transportClient) {
            return new StartClient(transportClient);
        }

        public ClientTransportAcceptor acceptor(Function<RSocket, RSocket> acceptor) {
            this.acceptor = () -> acceptor;
            return x$0 -> new StartClient(x$0);
        }

        public ClientTransportAcceptor acceptor(Supplier<Function<RSocket, RSocket>> acceptor) {
            this.acceptor = acceptor;
            return x$0 -> new StartClient(x$0);
        }

        public ClientRSocketFactory fragment(int mtu) {
            this.mtu = mtu;
            return this;
        }

        public ClientRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
            this.errorConsumer = errorConsumer;
            return this;
        }

        public ClientRSocketFactory setupPayload(Payload payload) {
            this.setupPayload = payload;
            return this;
        }

        public ClientRSocketFactory frameDecoder(Function<Frame, ? extends Payload> frameDecoder) {
            this.frameDecoder = frameDecoder;
            return this;
        }

        private class StartClient
        implements Start<RSocket> {
            private final Supplier<ClientTransport> transportClient;

            StartClient(Supplier<ClientTransport> transportClient) {
                this.transportClient = transportClient;
            }

            @Override
            public Mono<RSocket> start() {
                return this.transportClient.get().connect().flatMap(connection -> {
                    Frame setupFrame = Frame.Setup.from(ClientRSocketFactory.this.flags, (int)ClientRSocketFactory.this.tickPeriod.toMillis(), (int)(ClientRSocketFactory.this.ackTimeout.toMillis() + ClientRSocketFactory.this.tickPeriod.toMillis() * (long)ClientRSocketFactory.this.missedAcks), ClientRSocketFactory.this.metadataMimeType, ClientRSocketFactory.this.dataMimeType, ClientRSocketFactory.this.setupPayload);
                    if (ClientRSocketFactory.this.mtu > 0) {
                        connection = new FragmentationDuplexConnection((DuplexConnection)connection, ClientRSocketFactory.this.mtu);
                    }
                    ClientServerInputMultiplexer multiplexer = new ClientServerInputMultiplexer((DuplexConnection)connection, ClientRSocketFactory.this.plugins);
                    RSocketClient rSocketClient = new RSocketClient(multiplexer.asClientConnection(), ClientRSocketFactory.this.frameDecoder, ClientRSocketFactory.this.errorConsumer, StreamIdSupplier.clientSupplier(), ClientRSocketFactory.this.tickPeriod, ClientRSocketFactory.this.ackTimeout, ClientRSocketFactory.this.missedAcks);
                    RSocket wrappedRSocketClient = ClientRSocketFactory.this.plugins.applyClient(rSocketClient);
                    RSocket unwrappedServerSocket = (RSocket)((Function)ClientRSocketFactory.this.acceptor.get()).apply(wrappedRSocketClient);
                    RSocket wrappedRSocketServer = ClientRSocketFactory.this.plugins.applyServer(unwrappedServerSocket);
                    RSocketServer rSocketServer = new RSocketServer(multiplexer.asServerConnection(), wrappedRSocketServer, ClientRSocketFactory.this.frameDecoder, ClientRSocketFactory.this.errorConsumer);
                    return connection.sendOne(setupFrame).thenReturn((Object)wrappedRSocketClient);
                });
            }
        }
    }

    public static interface ServerTransportAcceptor {
        public <T extends Closeable> Start<T> transport(Supplier<ServerTransport<T>> var1);

        default public <T extends Closeable> Start<T> transport(ServerTransport<T> transport) {
            return this.transport(() -> transport);
        }
    }

    public static interface ClientTransportAcceptor {
        public Start<RSocket> transport(Supplier<ClientTransport> var1);

        default public Start<RSocket> transport(ClientTransport transport) {
            return this.transport(() -> transport);
        }
    }

    public static interface Start<T extends Closeable> {
        public Mono<T> start();
    }
}

