Skip to content

Commit

Permalink
[SPARK-49539][SS] Update internal col families start identifier to a …
Browse files Browse the repository at this point in the history
…different one

### What changes were proposed in this pull request?
Update internal col families start identifier to a different one

### Why are the changes needed?
Some users might choose to use `_` as the start character for their state variable name as well as argument. Hence we update the internal col families start identifier to the `$` char. Scala [lexical identifiers](https://www.scala-lang.org/files/archive/spec/2.13/01-lexical-syntax.html) follow the same rule and Python also doesn't allow [vars](https://gawron.sdsu.edu/python_for_ss/course_core/book_draft/Python_introduction/variables.html) to start with this char.

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
Modified unit tests

```
[info] Run completed in 3 seconds, 442 milliseconds.
[info] Total number of tests run: 4
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #48030 from anishshri-db/task/SPARK-49539.

Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
anishshri-db authored and HeartSaVioR committed Sep 10, 2024
1 parent 8732528 commit 3633cfb
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ abstract class SingleKeyTTLStateImpl(

import org.apache.spark.sql.execution.streaming.StateTTLSchema._

private val ttlColumnFamilyName = s"_ttl_$stateName"
private val ttlColumnFamilyName = "$ttl_" + stateName
private val keySchema = getSingleKeyTTLRowSchema(keyExprEnc.schema)
private val keyTTLRowEncoder = new SingleKeyTTLEncoder(keyExprEnc)

Expand Down Expand Up @@ -205,7 +205,7 @@ abstract class CompositeKeyTTLStateImpl[K](

import org.apache.spark.sql.execution.streaming.StateTTLSchema._

private val ttlColumnFamilyName = s"_ttl_$stateName"
private val ttlColumnFamilyName = "$ttl_" + stateName
private val keySchema = getCompositeKeyTTLRowSchema(
keyExprEnc.schema, userKeyEncoder.schema
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import org.apache.spark.util.NextIterator
* Singleton utils class used primarily while interacting with TimerState
*/
object TimerStateUtils {
val PROC_TIMERS_STATE_NAME = "_procTimers"
val EVENT_TIMERS_STATE_NAME = "_eventTimers"
val PROC_TIMERS_STATE_NAME = "$procTimers"
val EVENT_TIMERS_STATE_NAME = "$eventTimers"
val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ private[sql] class RocksDBStateStoreProvider
}

// if the column family is not internal and uses reserved characters, throw an exception
if (!isInternal && colFamilyName.charAt(0) == '_') {
if (!isInternal && colFamilyName.charAt(0) == '$') {
throw StateStoreErrors.cannotCreateColumnFamilyWithReservedChars(colFamilyName)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
newStoreProvider(useColumnFamilies = colFamiliesEnabled)) { provider =>
val store = provider.getStore(0)

Seq("_internal", "_test", "_test123", "__12345").foreach { colFamilyName =>
Seq("$internal", "$test", "$test123", "$_12345", "$$$235").foreach { colFamilyName =>
val ex = intercept[SparkUnsupportedOperationException] {
store.createColFamilyIfAbsent(colFamilyName,
keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema))
Expand All @@ -985,7 +985,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
parameters = Map(
"colFamilyName" -> colFamilyName
),
matchPVals = true
matchPVals = false
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
val handle = new StatefulProcessorHandleImpl(store,
UUID.randomUUID(), Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())

val cfName = "_testState"
val cfName = "$testState"
val ex = intercept[SparkUnsupportedOperationException] {
handle.getValueState[Long](cfName, Encoders.scalaLong)
}
Expand All @@ -176,7 +176,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
parameters = Map(
"colFamilyName" -> cfName
),
matchPVals = true
matchPVals = false
)
}
}
Expand Down

0 comments on commit 3633cfb

Please sign in to comment.