Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44860][SQL] Add SESSION_USER function #42549

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ datetimeUnit
;

primaryExpression
: name=(CURRENT_DATE | CURRENT_TIMESTAMP | CURRENT_USER | USER) #currentLike
: name=(CURRENT_DATE | CURRENT_TIMESTAMP | CURRENT_USER | USER | SESSION_USER) #currentLike
| name=(TIMESTAMPADD | DATEADD | DATE_ADD) LEFT_PAREN (unit=datetimeUnit | invalidUnit=stringLit) COMMA unitsAmount=valueExpression COMMA timestamp=valueExpression RIGHT_PAREN #timestampadd
| name=(TIMESTAMPDIFF | DATEDIFF | DATE_DIFF) LEFT_PAREN (unit=datetimeUnit | invalidUnit=stringLit) COMMA startTimestamp=valueExpression COMMA endTimestamp=valueExpression RIGHT_PAREN #timestampdiff
| CASE whenClause+ (ELSE elseExpression=expression)? END #searchedCase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,13 @@ trait ColumnResolutionHelper extends Logging {
}
}

// support CURRENT_DATE, CURRENT_TIMESTAMP, and grouping__id
// support CURRENT_DATE, CURRENT_TIMESTAMP, CURRENT_USER, SESSION_USER and grouping__id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// support CURRENT_DATE, CURRENT_TIMESTAMP, CURRENT_USER, SESSION_USER and grouping__id
// support CURRENT_DATE, CURRENT_TIMESTAMP, CURRENT_USER, USER, SESSION_USER and grouping__id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private val literalFunctions: Seq[(String, () => Expression, Expression => String)] = Seq(
(CurrentDate().prettyName, () => CurrentDate(), toPrettySQL(_)),
(CurrentTimestamp().prettyName, () => CurrentTimestamp(), toPrettySQL(_)),
(CurrentUser().prettyName, () => CurrentUser(), toPrettySQL),
("user", () => CurrentUser(), toPrettySQL),
(SessionUser().prettyName, () => SessionUser(), toPrettySQL),
(VirtualColumn.hiveGroupingIdName, () => GroupingID(Nil), _ => VirtualColumn.hiveGroupingIdName)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,7 @@ object FunctionRegistry {
expression[CurrentCatalog]("current_catalog"),
expression[CurrentUser]("current_user"),
expression[CurrentUser]("user", setAlias = true),
expression[SessionUser]("session_user"),
expression[CallMethodViaReflection]("reflect"),
expression[CallMethodViaReflection]("java_method", true),
expression[SparkVersion]("version"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,24 @@ case class CurrentUser() extends LeafExpression with Unevaluable {
final override val nodePatterns: Seq[TreePattern] = Seq(CURRENT_LIKE)
}

// scalastyle:off line.size.limit
@ExpressionDescription(
usage = """_FUNC_() - connected user name.""",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both this comment and the existing comment for current_user on L289 above could use some more information, especially now that we have both current_user and session_user as separate functions. Even if this PR implements both functions as the same implementation, could we at least (1) dig into how the current_user function is currently implemented and describe all the cases there, and (2) mention for this session_user function that this should always refer to the identity of the invoker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

converting to alias since session_user change is not relevent for non-external functions.

examples = """
Examples:
> SELECT _FUNC_();
mockingjay
""",
since = "3.5.0",
group = "misc_funcs")
// scalastyle:on line.size.limit
case class SessionUser() extends LeafExpression with Unevaluable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not register as an alias with CurrentUser

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thank you!

override def nullable: Boolean = false
override def dataType: DataType = StringType
override def prettyName: String = "session_user"
final override val nodePatterns: Seq[TreePattern] = Seq(CURRENT_LIKE)
}

/**
* A function that encrypts input using AES. Key lengths of 128, 192 or 256 bits can be used.
* For versions prior to JDK 8u161, 192 and 256 bits keys can be used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ case class ReplaceCurrentLike(catalogManager: CatalogManager) extends Rule[Logic
Literal.create(currentCatalog, StringType)
case CurrentUser() =>
Literal.create(currentUser, StringType)
case SessionUser() =>
Literal.create(currentUser, StringType)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2101,6 +2101,8 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
CurrentTimestamp()
case SqlBaseParser.CURRENT_USER | SqlBaseParser.USER =>
CurrentUser()
case SqlBaseParser.SESSION_USER =>
SessionUser()
}
} else {
// If the parser is not in ansi mode, we should return `UnresolvedAttribute`, in case there
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5878,11 +5878,12 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
checkAnswer(df.selectExpr("CURRENT_SCHEMA()"), df.select(current_schema()))
}

test("function current_user, user") {
test("function current_user, user, session_user") {
val df = Seq((1, 2), (3, 1)).toDF("a", "b")

checkAnswer(df.selectExpr("CURRENT_USER()"), df.select(current_user()))
checkAnswer(df.selectExpr("USER()"), df.select(user()))
checkAnswer(df.selectExpr("SESSION_USER()"), df.select(session_user()))
}

test("named_struct function") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,16 @@ class MiscFunctionsSuite extends QueryTest with SharedSparkSession {
checkAnswer(df.selectExpr("version()"), df.select(version()))
}

test("SPARK-21957: get current_user in normal spark apps") {
test("SPARK-21957, SPARK-44860: get current_user, session_user in normal spark apps") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also update SPARK-21957: get current_user through thrift server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

val user = spark.sparkContext.sparkUser
withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") {
val df = sql("select current_user(), current_user, user, user()")
checkAnswer(df, Row(user, user, user, user))
val df =
sql("select current_user(), current_user, user, user(), session_user(), session_user")
checkAnswer(df, Row(user, user, user, user, user, user))
}
withSQLConf(SQLConf.ANSI_ENABLED.key -> "true",
SQLConf.ENFORCE_RESERVED_KEYWORDS.key -> "true") {
Seq("user", "current_user").foreach { func =>
Seq("user", "current_user", "session_user").foreach { func =>
checkAnswer(sql(s"select $func"), Row(user))
checkError(
exception = intercept[ParseException](sql(s"select $func()")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.ErrorMessageFormat.MINIMAL
import org.apache.spark.SparkThrowableHelper.getMessage
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestampLike, CurrentUser, Literal}
import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestampLike, CurrentUser, Literal, SessionUser}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.HiveResult.hiveResultString
Expand Down Expand Up @@ -81,6 +81,9 @@ trait SQLQueryTestHelper extends Logging {
case expr: CurrentUser =>
deterministic = false
expr
case expr: SessionUser =>
deterministic = false
expr
case expr: Literal if expr.dataType == DateType || expr.dataType == TimestampType =>
deterministic = false
expr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession {
// Throws an error
"org.apache.spark.sql.catalyst.expressions.RaiseError",
classOf[CurrentUser].getName,
classOf[SessionUser].getName,
// The encrypt expression includes a random initialization vector to its encrypted result
classOf[AesEncrypt].getName)

Expand Down