diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b2c4d97bc7b07..8b621e82afe28 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -960,14 +960,13 @@ private[spark] class Client( /** * Set up the environment for launching our ApplicationMaster container. */ - private def setupLaunchEnv( + private[yarn] def setupLaunchEnv( stagingDirPath: Path, pySparkArchives: Seq[String]): HashMap[String, String] = { logInfo("Setting up the launch environment for our AM container") val env = new HashMap[String, String]() populateClasspath(args, hadoopConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH)) env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString - env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() env("SPARK_PREFER_IPV6") = Utils.preferIPv6.toString // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.* @@ -977,6 +976,10 @@ private[spark] class Client( .map { case (k, v) => (k.substring(amEnvPrefix.length), v) } .foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, k, v) } + if (!env.contains("SPARK_USER")) { + env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() + } + // If pyFiles contains any .py files, we need to add LOCALIZED_PYTHON_DIR to the PYTHONPATH // of the container processes too. Add all non-.py files directly to PYTHONPATH. // diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 78e84690900e1..93d6cc474d20f 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -29,6 +29,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.hadoop.mapreduce.MRJobConfig +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.protocolrecords.{GetNewApplicationResponse, SubmitApplicationRequest} import org.apache.hadoop.yarn.api.records._ @@ -739,6 +740,21 @@ class ClientSuite extends SparkFunSuite } } + test("SPARK-49760: default app master SPARK_USER") { + val sparkConf = new SparkConf() + val client = createClient(sparkConf) + val env = client.setupLaunchEnv(new Path("/staging/dir/path"), Seq()) + env("SPARK_USER") should be (UserGroupInformation.getCurrentUser().getShortUserName()) + } + + test("SPARK-49760: override app master SPARK_USER") { + val sparkConf = new SparkConf() + .set("spark.yarn.appMasterEnv.SPARK_USER", "overrideuser") + val client = createClient(sparkConf) + val env = client.setupLaunchEnv(new Path("/staging/dir/path"), Seq()) + env("SPARK_USER") should be ("overrideuser") + } + private val matching = Seq( ("files URI match test1", "file:///file1", "file:///file2"), ("files URI match test2", "file:///c:file1", "file://c:file2"),