package org.apache.flink.runtime.rpc.pekko;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Address;
import org.apache.pekko.actor.AddressFromURIString;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Slf4JLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/pekko/PekkoUtils.class */
public class PekkoUtils {
    private static final Logger LOG = LoggerFactory.getLogger(PekkoUtils.class);
    private static final String FLINK_ACTOR_SYSTEM_NAME = "flink";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/pekko/PekkoUtils$ConfigBuilder.class */
    public static class ConfigBuilder {
        private final StringWriter stringWriter;
        private final PrintWriter printWriter;

        private ConfigBuilder() {
            this.stringWriter = new StringWriter();
            this.printWriter = new PrintWriter(this.stringWriter);
        }

        public ConfigBuilder add(String str) {
            this.printWriter.println(str);
            return this;
        }

        public Config build() {
            return ConfigFactory.parseString(this.stringWriter.toString()).resolve();
        }
    }

    PekkoUtils() {
    }

    public static String getFlinkActorSystemName() {
        return FLINK_ACTOR_SYSTEM_NAME;
    }

    private static Config getBasicConfig(Configuration configuration) {
        int intValue = ((Integer) configuration.get(RpcOptions.DISPATCHER_THROUGHPUT)).intValue();
        String booleanToOnOrOff = booleanToOnOrOff(((Boolean) configuration.get(RpcOptions.JVM_EXIT_ON_FATAL_ERROR)).booleanValue());
        String booleanToOnOrOff2 = booleanToOnOrOff(((Boolean) configuration.get(RpcOptions.LOG_LIFECYCLE_EVENTS)).booleanValue());
        return new ConfigBuilder().add("pekko {").add("  daemonic = off").add("  loggers = [\"org.apache.pekko.event.slf4j.Slf4jLogger\"]").add("  logging-filter = \"org.apache.pekko.event.slf4j.Slf4jLoggingFilter\"").add("  log-config-on-start = off").add("  logger-startup-timeout = 50s").add("  loglevel = " + getLogLevel()).add("  stdout-loglevel = OFF").add("  log-dead-letters = " + booleanToOnOrOff2).add("  log-dead-letters-during-shutdown = " + booleanToOnOrOff2).add("  jvm-exit-on-fatal-error = " + booleanToOnOrOff).add("  serialize-messages = off").add("  actor {").add("    guardian-supervisor-strategy = " + EscalatingSupervisorStrategy.class.getCanonicalName()).add("    warn-about-java-serializer-usage = off").add("    allow-java-serialization = on").add("    default-dispatcher {").add("      throughput = " + intValue).add("    }").add("    supervisor-dispatcher {").add("      type = Dispatcher").add("      executor = \"thread-pool-executor\"").add("      thread-pool-executor {").add("        core-pool-size-min = 1").add("        core-pool-size-max = 1").add("      }").add("    }").add("  }").add("}").build();
    }

    private static String getLogLevel() {
        return (LOG.isTraceEnabled() || LOG.isDebugEnabled()) ? "DEBUG" : LOG.isInfoEnabled() ? "INFO" : LOG.isWarnEnabled() ? "WARNING" : LOG.isErrorEnabled() ? "ERROR" : "OFF";
    }

    public static Config getThreadPoolExecutorConfig(RpcSystem.FixedThreadPoolExecutorConfiguration fixedThreadPoolExecutorConfiguration) {
        return new ConfigBuilder().add("pekko {").add("  actor {").add("    default-dispatcher {").add("      type = " + PriorityThreadsDispatcher.class.getCanonicalName()).add("      executor = thread-pool-executor").add("      thread-priority = " + fixedThreadPoolExecutorConfiguration.getThreadPriority()).add("      thread-pool-executor {").add("          core-pool-size-min = " + fixedThreadPoolExecutorConfiguration.getMinNumThreads()).add("          core-pool-size-max = " + fixedThreadPoolExecutorConfiguration.getMaxNumThreads()).add("      }").add("    }").add("  }").add("}").build();
    }

    public static Config getForkJoinExecutorConfig(RpcSystem.ForkJoinExecutorConfiguration forkJoinExecutorConfiguration) {
        return new ConfigBuilder().add("pekko {").add("  actor {").add("    default-dispatcher {").add("      executor = fork-join-executor").add("      fork-join-executor {").add("          parallelism-factor = " + forkJoinExecutorConfiguration.getParallelismFactor()).add("          parallelism-min = " + forkJoinExecutorConfiguration.getMinParallelism()).add("          parallelism-max = " + forkJoinExecutorConfiguration.getMaxParallelism()).add("      }").add("    }").add("  }").add("}").build();
    }

    private static Config getRemoteConfig(Configuration configuration, String str, int i, String str2, int i2) {
        ConfigBuilder configBuilder = new ConfigBuilder();
        addBaseRemoteConfig(configBuilder, configuration, i, i2);
        addHostnameRemoteConfig(configBuilder, str, str2);
        addSslRemoteConfig(configBuilder, configuration);
        addRemoteForkJoinExecutorConfig(configBuilder, ActorSystemBootstrapTools.getRemoteForkJoinExecutorConfiguration(configuration));
        return configBuilder.build();
    }

    private static void addBaseRemoteConfig(ConfigBuilder configBuilder, Configuration configuration, int i, int i2) {
        configBuilder.add("pekko {").add("  actor {").add("    provider = \"org.apache.pekko.remote.RemoteActorRefProvider\"").add("  }").add("  remote.artery.enabled = false").add("  remote.startup-timeout = " + TimeUtils.getStringInMillis(TimeUtils.parseDuration((String) configuration.get(RpcOptions.STARTUP_TIMEOUT, TimeUtils.getStringInMillis(((Duration) configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)).multipliedBy(10L)))))).add("  remote.warn-about-direct-use = off").add("  remote.use-unsafe-remote-features-outside-cluster = on").add("  remote.classic {").add("    # disable the transport failure detector by setting very high values").add("    transport-failure-detector{").add("      acceptable-heartbeat-pause = 6000 s").add("      heartbeat-interval = 1000 s").add("      threshold = 300").add("    }").add("    enabled-transports = [\"pekko.remote.classic.netty.tcp\"]").add("    netty {").add("      tcp {").add("        transport-class = \"org.apache.pekko.remote.transport.netty.NettyTransport\"").add("        port = " + i2).add("        bind-port = " + i).add("        connection-timeout = " + TimeUtils.getStringInMillis(TimeUtils.parseDuration((String) configuration.get(RpcOptions.TCP_TIMEOUT)))).add("        maximum-frame-size = " + ((String) configuration.get(RpcOptions.FRAMESIZE))).add("        tcp-nodelay = on").add("        client-socket-worker-pool {").add("          pool-size-min = " + ((Integer) configuration.get(RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN)).intValue()).add("          pool-size-max = " + ((Integer) configuration.get(RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX)).intValue()).add("          pool-size-factor = " + ((Double) configuration.get(RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR)).doubleValue()).add("        }").add("        server-socket-worker-pool {").add("          pool-size-min = " + ((Integer) configuration.get(RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN)).intValue()).add("          pool-size-max = " + ((Integer) configuration.get(RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX)).intValue()).add("          pool-size-factor = " + ((Double) configuration.get(RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR)).doubleValue()).add("        }").add("      }").add("    }").add("    log-remote-lifecycle-events = " + booleanToOnOrOff(((Boolean) configuration.get(RpcOptions.LOG_LIFECYCLE_EVENTS)).booleanValue())).add("    retry-gate-closed-for = " + ((Long) configuration.get(RpcOptions.RETRY_GATE_CLOSED_FOR)).longValue() + " ms").add("  }").add("}");
    }

    private static void addHostnameRemoteConfig(ConfigBuilder configBuilder, String str, String str2) {
        String unresolvedHostToNormalizedString = NetUtils.unresolvedHostToNormalizedString(str2);
        configBuilder.add("pekko {").add("  remote.classic {").add("    netty {").add("      tcp {").add("        hostname = \"" + ((unresolvedHostToNormalizedString == null || unresolvedHostToNormalizedString.isEmpty()) ? "" : unresolvedHostToNormalizedString) + "\"").add("        bind-hostname = \"" + str + "\"").add("      }").add("    }").add("  }").add("}");
    }

    private static void addSslRemoteConfig(ConfigBuilder configBuilder, Configuration configuration) {
        String booleanToOnOrOff = booleanToOnOrOff(((Boolean) configuration.get(RpcOptions.SSL_ENABLED)).booleanValue() && SecurityOptions.isInternalSSLEnabled(configuration));
        String str = (String) configuration.get(SecurityOptions.SSL_INTERNAL_KEYSTORE, configuration.get(SecurityOptions.SSL_KEYSTORE));
        String str2 = (String) configuration.get(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, configuration.get(SecurityOptions.SSL_KEYSTORE_PASSWORD));
        String str3 = (String) configuration.get(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, configuration.get(SecurityOptions.SSL_KEY_PASSWORD));
        String str4 = (String) configuration.get(SecurityOptions.SSL_INTERNAL_TRUSTSTORE, configuration.get(SecurityOptions.SSL_TRUSTSTORE));
        String str5 = (String) configuration.get(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, configuration.get(SecurityOptions.SSL_TRUSTSTORE_PASSWORD));
        String str6 = (String) configuration.get(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT);
        configBuilder.add("pekko {").add("  remote.classic {").add("    enabled-transports = [\"pekko.remote.classic.netty.ssl\"]").add("    netty {").add("      ssl = ${pekko.remote.classic.netty.tcp}").add("      ssl {").add("        enable-ssl = " + booleanToOnOrOff).add("        ssl-engine-provider = " + CustomSSLEngineProvider.class.getCanonicalName()).add("        security {").add("          key-store = \"" + str + "\"").add("          key-store-password = \"" + str2 + "\"").add("          key-password = \"" + str3 + "\"").add("          trust-store = \"" + str4 + "\"").add("          trust-store-password = \"" + str5 + "\"").add("          protocol = " + ((String) configuration.get(SecurityOptions.SSL_PROTOCOL)) + "").add("          enabled-algorithms = " + ((String) Arrays.stream(((String) configuration.get(SecurityOptions.SSL_ALGORITHMS)).split(",")).collect(Collectors.joining(",", "[", "]"))) + "").add("          random-number-generator = \"\"").add("          require-mutual-authentication = on").add("          cert-fingerprints = " + (str6 != null ? (String) Arrays.stream(str6.split(",")).collect(Collectors.joining("\",\"", "[\"", "\"]")) : "[]") + "").add("        }").add("      }").add("    }").add("  }").add("}");
    }

    private static Config addRemoteForkJoinExecutorConfig(ConfigBuilder configBuilder, RpcSystem.ForkJoinExecutorConfiguration forkJoinExecutorConfiguration) {
        return configBuilder.add("pekko {").add("  remote {").add("    default-remote-dispatcher {").add("      executor = fork-join-executor").add("      fork-join-executor {").add("          parallelism-factor = " + forkJoinExecutorConfiguration.getParallelismFactor()).add("          parallelism-min = " + forkJoinExecutorConfiguration.getMinParallelism()).add("          parallelism-max = " + forkJoinExecutorConfiguration.getMaxParallelism()).add("      }").add("    }").add("  }").add("}").build();
    }

    public static ActorSystem createLocalActorSystem(Configuration configuration) {
        return createActorSystem(getConfig(configuration, null));
    }

    private static ActorSystem createActorSystem(Config config) {
        return createActorSystem(getFlinkActorSystemName(), config);
    }

    public static ActorSystem createActorSystem(String str, Config config) {
        InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
        return RobustActorSystem.create(str, config);
    }

    @VisibleForTesting
    public static ActorSystem createDefaultActorSystem() {
        return createActorSystem(getDefaultConfig());
    }

    private static Config getDefaultConfig() {
        return getConfig(new Configuration(), new HostAndPort("", 0));
    }

    public static Config getConfig(Configuration configuration, @Nullable HostAndPort hostAndPort) {
        return getConfig(configuration, hostAndPort, null, getForkJoinExecutorConfig(ActorSystemBootstrapTools.getForkJoinExecutorConfiguration(configuration)));
    }

    public static Config getConfig(Configuration configuration, @Nullable HostAndPort hostAndPort, @Nullable HostAndPort hostAndPort2, Config config) {
        Config withFallback = getBasicConfig(configuration).withFallback((ConfigMergeable) config);
        return hostAndPort != null ? hostAndPort2 != null ? getRemoteConfig(configuration, hostAndPort2.getHost(), hostAndPort2.getPort(), hostAndPort.getHost(), hostAndPort.getPort()).withFallback((ConfigMergeable) withFallback) : getRemoteConfig(configuration, NetUtils.getWildcardIPAddress(), hostAndPort.getPort(), hostAndPort.getHost(), hostAndPort.getPort()).withFallback((ConfigMergeable) withFallback) : withFallback;
    }

    public static Address getAddress(ActorSystem actorSystem) {
        return RemoteAddressExtension.INSTANCE.apply(actorSystem).getAddress();
    }

    public static String getRpcURL(ActorSystem actorSystem, ActorRef actorRef) {
        return actorRef.path().toStringWithAddress(getAddress(actorSystem));
    }

    public static Address getAddressFromRpcURL(String str) throws MalformedURLException {
        return AddressFromURIString.apply(str);
    }

    public static InetSocketAddress getInetSocketAddressFromRpcURL(String str) throws Exception {
        try {
            Address addressFromRpcURL = getAddressFromRpcURL(str);
            if (addressFromRpcURL.host().isDefined() && addressFromRpcURL.port().isDefined()) {
                return new InetSocketAddress(addressFromRpcURL.host().get(), ((Integer) addressFromRpcURL.port().get()).intValue());
            }
            throw new MalformedURLException();
        } catch (MalformedURLException e) {
            throw new Exception("Could not retrieve InetSocketAddress from Pekko URL " + str);
        }
    }

    public static CompletableFuture<Void> terminateActorSystem(ActorSystem actorSystem) {
        return ScalaFutureUtils.toJava(actorSystem.terminate()).thenAccept(FunctionUtils.ignoreFn());
    }

    private static String booleanToOnOrOff(boolean z) {
        return z ? "on" : "off";
    }
}
