/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.security.token.hadoop;

import java.io.IOException;
import java.time.Clock;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.security.token.DelegationTokenProvider;
import org.apache.flink.runtime.hadoop.HadoopUserUtils;
import org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class HadoopFSDelegationTokenProvider
implements DelegationTokenProvider {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopFSDelegationTokenProvider.class);
    private Configuration flinkConfiguration;
    private org.apache.hadoop.conf.Configuration hadoopConfiguration;
    private KerberosLoginProvider kerberosLoginProvider;
    private Optional<Long> tokenRenewalInterval;

    @Override
    public String serviceName() {
        return "hadoopfs";
    }

    @Override
    public void init(Configuration configuration) throws Exception {
        this.flinkConfiguration = configuration;
        try {
            this.hadoopConfiguration = HadoopUtils.getHadoopConfiguration((Configuration)configuration);
            this.kerberosLoginProvider = new KerberosLoginProvider(configuration);
        }
        catch (NoClassDefFoundError e) {
            LOG.info("Hadoop FS is not available (not packaged with this application): {} : \"{}\".", (Object)e.getClass().getSimpleName(), (Object)e.getMessage());
        }
    }

    @Override
    public boolean delegationTokensRequired() throws Exception {
        if (this.hadoopConfiguration == null) {
            LOG.debug("Hadoop FS is not available (not packaged with this application), hence no tokens will be acquired.");
            return false;
        }
        return HadoopUtils.isKerberosSecurityEnabled((UserGroupInformation)UserGroupInformation.getCurrentUser()) && this.kerberosLoginProvider.isLoginPossible(false);
    }

    @Override
    public DelegationTokenProvider.ObtainedDelegationTokens obtainDelegationTokens() throws Exception {
        UserGroupInformation freshUGI = this.kerberosLoginProvider.doLoginAndReturnUGI();
        return (DelegationTokenProvider.ObtainedDelegationTokens)freshUGI.doAs(() -> {
            Credentials credentials = new Credentials();
            Clock clock = Clock.systemDefaultZone();
            Set<FileSystem> fileSystemsToAccess = this.getFileSystemsToAccess();
            this.obtainDelegationTokens(this.getRenewer(), fileSystemsToAccess, credentials);
            if (this.tokenRenewalInterval == null) {
                this.tokenRenewalInterval = this.getTokenRenewalInterval(clock, fileSystemsToAccess);
            }
            Optional<Long> validUntil = this.tokenRenewalInterval.flatMap(interval -> this.getTokenRenewalDate(clock, credentials, (long)interval));
            return new DelegationTokenProvider.ObtainedDelegationTokens(HadoopDelegationTokenConverter.serialize(credentials), validUntil);
        });
    }

    @Nullable
    @VisibleForTesting
    String getRenewer() {
        return this.flinkConfiguration.getString(String.format("security.kerberos.token.provider.%s.renewer", this.serviceName()), null);
    }

    private Set<FileSystem> getFileSystemsToAccess() throws IOException {
        HashSet<FileSystem> result = new HashSet<FileSystem>();
        FileSystem defaultFileSystem = FileSystem.get((org.apache.hadoop.conf.Configuration)this.hadoopConfiguration);
        LOG.debug("Adding Hadoop default filesystem to file systems to access {}", (Object)defaultFileSystem);
        result.add(defaultFileSystem);
        LOG.debug("Hadoop default filesystem added to file systems to access successfully");
        ConfigUtils.decodeListFromConfig(this.flinkConfiguration, SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS, Path::new).forEach(path -> {
            try {
                LOG.debug("Adding path's filesystem to file systems to access {}", path);
                result.add(path.getFileSystem(this.hadoopConfiguration));
                LOG.debug("Path's filesystem added to file systems to access successfully");
            }
            catch (IOException e) {
                LOG.error("Failed to get filesystem for {}", path, (Object)e);
                throw new FlinkRuntimeException(e);
            }
        });
        if (this.flinkConfiguration.get(DeploymentOptions.TARGET, "").toLowerCase().contains("yarn")) {
            LOG.debug("Running on YARN, trying to add staging directory to file systems to access");
            String yarnStagingDirectory = this.flinkConfiguration.getString("yarn.staging-directory", "");
            if (!StringUtils.isBlank((CharSequence)yarnStagingDirectory)) {
                LOG.debug("Adding staging directory to file systems to access {}", (Object)yarnStagingDirectory);
                result.add(new Path(yarnStagingDirectory).getFileSystem(this.hadoopConfiguration));
                LOG.debug("Staging directory added to file systems to access successfully");
            } else {
                LOG.debug("Staging directory is not set or empty so not added to file systems to access");
            }
        }
        return result;
    }

    protected void obtainDelegationTokens(@Nullable String renewer, Set<FileSystem> fileSystemsToAccess, Credentials credentials) {
        fileSystemsToAccess.forEach(fs -> {
            try {
                LOG.debug("Obtaining delegation token for {} with renewer {}", fs, (Object)renewer);
                fs.addDelegationTokens(renewer, credentials);
                LOG.debug("Delegation obtained successfully");
            }
            catch (Exception e) {
                LOG.error("Failed to obtain delegation token for {}", fs, (Object)e);
                throw new FlinkRuntimeException(e);
            }
        });
    }

    @VisibleForTesting
    Optional<Long> getTokenRenewalInterval(Clock clock, Set<FileSystem> fileSystemsToAccess) throws IOException {
        String renewer = UserGroupInformation.getCurrentUser().getUserName();
        Credentials credentials = new Credentials();
        this.obtainDelegationTokens(renewer, fileSystemsToAccess, credentials);
        Optional<Long> result = credentials.getAllTokens().stream().filter(t -> {
            try {
                return t.decodeIdentifier() instanceof AbstractDelegationTokenIdentifier;
            }
            catch (IOException e) {
                throw new FlinkRuntimeException(e);
            }
        }).map(t -> {
            try {
                long newExpiration = t.renew(this.hadoopConfiguration);
                AbstractDelegationTokenIdentifier identifier = (AbstractDelegationTokenIdentifier)t.decodeIdentifier();
                String tokenKind = t.getKind().toString();
                long interval = newExpiration - HadoopUserUtils.getIssueDate(clock, tokenKind, identifier);
                LOG.debug("Renewal interval is {} for token {}", (Object)interval, (Object)tokenKind);
                return interval;
            }
            catch (Exception e) {
                throw new FlinkRuntimeException(e);
            }
        }).min(Long::compare);
        LOG.debug("Global renewal interval is {}", result);
        return result;
    }

    @VisibleForTesting
    Optional<Long> getTokenRenewalDate(Clock clock, Credentials credentials, long renewalInterval) {
        if (renewalInterval < 0L) {
            LOG.debug("Negative renewal interval so no renewal date is calculated");
            return Optional.empty();
        }
        Optional<Long> result = credentials.getAllTokens().stream().filter(t -> {
            try {
                return t.decodeIdentifier() instanceof AbstractDelegationTokenIdentifier;
            }
            catch (IOException e) {
                throw new FlinkRuntimeException(e);
            }
        }).map(t -> {
            try {
                AbstractDelegationTokenIdentifier identifier = (AbstractDelegationTokenIdentifier)t.decodeIdentifier();
                String tokenKind = t.getKind().toString();
                long date = HadoopUserUtils.getIssueDate(clock, tokenKind, identifier) + renewalInterval;
                LOG.debug("Renewal date is {} for token {}", (Object)date, (Object)tokenKind);
                return date;
            }
            catch (Exception e) {
                throw new FlinkRuntimeException(e);
            }
        }).min(Long::compare);
        LOG.debug("Global renewal date is {}", result);
        return result;
    }
}

