Skip to content

Commit

Permalink
[SPARK-49760][YARN] Correct handling of SPARK_USER env variable ove…
Browse files Browse the repository at this point in the history
…rride in app master

### What changes were proposed in this pull request?

This patch corrects handling of a user-supplied `SPARK_USER` environment variable in the YARN app master. Currently, the user-supplied value gets appended to the default, like a classpath entry. The patch fixes it by using only the user-supplied value.

### Why are the changes needed?

Overriding the `SPARK_USER` environment variable in the YARN app master with configuration property `spark.yarn.appMasterEnv.SPARK_USER` currently results in an incorrect value. `Client#setupLaunchEnv` first sets a default in the environment map using the Hadoop user. After that, `YarnSparkHadoopUtil.addPathToEnvironment` sees the existing value in the map and interprets the user-supplied value as needing to be appended like a classpath entry. The end result is the Hadoop user appended with the classpath delimiter and user-supplied value, e.g. `cnauroth:overrideuser`.

### Does this PR introduce _any_ user-facing change?

Yes, the app master now uses the user-supplied `SPARK_USER` if specified. (The default is still the Hadoop user.)

### How was this patch tested?

* Existing unit tests pass.
* Added new unit tests covering default and overridden `SPARK_USER` for the app master. The override test fails without this patch, and then passes after the patch is applied.
* Manually tested in a live YARN cluster as shown below.

Manual testing used the `DFSReadWriteTest` job with overrides of `SPARK_USER`:

```
spark-submit \
    --deploy-mode cluster \
    --files all-lines.txt \
    --class org.apache.spark.examples.DFSReadWriteTest \
    --conf spark.yarn.appMasterEnv.SPARK_USER=sparkuser_appMaster \
    --conf spark.driverEnv.SPARK_USER=sparkuser_driver \
    --conf spark.executorEnv.SPARK_USER=sparkuser_executor \
    /usr/lib/spark/examples/jars/spark-examples.jar \
    all-lines.txt /tmp/DFSReadWriteTest
```

Before the patch, we can see the app master's `SPARK_USER` mishandled by looking at the `_SUCCESS` file in HDFS:

```
hdfs dfs -ls -R /tmp/DFSReadWriteTest

drwxr-xr-x   - cnauroth:sparkuser_appMaster hadoop          0 2024-09-20 23:35 /tmp/DFSReadWriteTest/dfs_read_write_test
-rw-r--r--   1 cnauroth:sparkuser_appMaster hadoop          0 2024-09-20 23:35 /tmp/DFSReadWriteTest/dfs_read_write_test/_SUCCESS
-rw-r--r--   1 sparkuser_executor                      hadoop    2295080 2024-09-20 23:35 /tmp/DFSReadWriteTest/dfs_read_write_test/part-00000
-rw-r--r--   1 sparkuser_executor                      hadoop    2288718 2024-09-20 23:35 /tmp/DFSReadWriteTest/dfs_read_write_test/part-00001
```

After the patch, we can see it working correctly:

```
hdfs dfs -ls -R /tmp/DFSReadWriteTest
drwxr-xr-x   - sparkuser_appMaster hadoop          0 2024-09-23 17:13 /tmp/DFSReadWriteTest/dfs_read_write_test
-rw-r--r--   1 sparkuser_appMaster hadoop          0 2024-09-23 17:13 /tmp/DFSReadWriteTest/dfs_read_write_test/_SUCCESS
-rw-r--r--   1 sparkuser_executor  hadoop    2295080 2024-09-23 17:13 /tmp/DFSReadWriteTest/dfs_read_write_test/part-00000
-rw-r--r--   1 sparkuser_executor  hadoop    2288718 2024-09-23 17:13 /tmp/DFSReadWriteTest/dfs_read_write_test/part-00001
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48214 from cnauroth/SPARK-49760.

Authored-by: Chris Nauroth <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
cnauroth authored and dongjoon-hyun committed Sep 24, 2024
1 parent 742265e commit 35e5d29
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -960,14 +960,13 @@ private[spark] class Client(
/**
* Set up the environment for launching our ApplicationMaster container.
*/
private def setupLaunchEnv(
private[yarn] def setupLaunchEnv(
stagingDirPath: Path,
pySparkArchives: Seq[String]): HashMap[String, String] = {
logInfo("Setting up the launch environment for our AM container")
val env = new HashMap[String, String]()
populateClasspath(args, hadoopConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH))
env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
env("SPARK_PREFER_IPV6") = Utils.preferIPv6.toString

// Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
Expand All @@ -977,6 +976,10 @@ private[spark] class Client(
.map { case (k, v) => (k.substring(amEnvPrefix.length), v) }
.foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, k, v) }

if (!env.contains("SPARK_USER")) {
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
}

// If pyFiles contains any .py files, we need to add LOCALIZED_PYTHON_DIR to the PYTHONPATH
// of the container processes too. Add all non-.py files directly to PYTHONPATH.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import scala.jdk.CollectionConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords.{GetNewApplicationResponse, SubmitApplicationRequest}
import org.apache.hadoop.yarn.api.records._
Expand Down Expand Up @@ -739,6 +740,21 @@ class ClientSuite extends SparkFunSuite
}
}

test("SPARK-49760: default app master SPARK_USER") {
val sparkConf = new SparkConf()
val client = createClient(sparkConf)
val env = client.setupLaunchEnv(new Path("/staging/dir/path"), Seq())
env("SPARK_USER") should be (UserGroupInformation.getCurrentUser().getShortUserName())
}

test("SPARK-49760: override app master SPARK_USER") {
val sparkConf = new SparkConf()
.set("spark.yarn.appMasterEnv.SPARK_USER", "overrideuser")
val client = createClient(sparkConf)
val env = client.setupLaunchEnv(new Path("/staging/dir/path"), Seq())
env("SPARK_USER") should be ("overrideuser")
}

private val matching = Seq(
("files URI match test1", "file:///file1", "file:///file2"),
("files URI match test2", "file:///c:file1", "file://c:file2"),
Expand Down

0 comments on commit 35e5d29

Please sign in to comment.