Skip to content

Commit

Permalink
unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
charlenelyu-db committed Sep 1, 2023
1 parent 7b8c8f5 commit d49ed87
Showing 1 changed file with 73 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ class CachedTableManagerSuite extends SparkFunSuite {
Map("id1" -> "url1", "id2" -> "url2"),
Seq(new WeakReference(ref)),
provider,
() => {
(Map("id1" -> "url1", "id2" -> "url2"), None)
})
_ => {
(Map("id1" -> "url1", "id2" -> "url2"), None, None)
},
refreshToken = None)
assert(manager.getPreSignedUrl(provider.getCustomTablePath("test-table-path"),
"id1")._1 == "url1")
assert(manager.getPreSignedUrl(provider.getCustomTablePath("test-table-path"),
Expand All @@ -60,9 +61,10 @@ class CachedTableManagerSuite extends SparkFunSuite {
Map("id1" -> "url1", "id2" -> "url2"),
Seq(new WeakReference(ref)),
provider,
() => {
(Map("id1" -> "url3", "id2" -> "url4"), None)
})
_ => {
(Map("id1" -> "url3", "id2" -> "url4"), None, None)
},
refreshToken = None)
// We should get the new urls eventually
eventually(timeout(10.seconds)) {
assert(manager.getPreSignedUrl(provider.getCustomTablePath("test-table-path2"),
Expand All @@ -76,9 +78,10 @@ class CachedTableManagerSuite extends SparkFunSuite {
Map("id1" -> "url1", "id2" -> "url2"),
Seq(new WeakReference(new AnyRef)),
provider,
() => {
(Map("id1" -> "url3", "id2" -> "url4"), None)
})
_ => {
(Map("id1" -> "url3", "id2" -> "url4"), None, None)
},
refreshToken = None)
// We should remove the cached table eventually
eventually(timeout(10.seconds)) {
System.gc()
Expand All @@ -93,10 +96,11 @@ class CachedTableManagerSuite extends SparkFunSuite {
Map("id1" -> "url1", "id2" -> "url2"),
Seq(new WeakReference(ref)),
provider,
() => {
(Map("id1" -> "url3", "id2" -> "url4"), None)
_ => {
(Map("id1" -> "url3", "id2" -> "url4"), None, None)
},
-1
-1,
refreshToken = None
)
// We should get new urls immediately because it's refreshed upon register
assert(manager.getPreSignedUrl(provider.getCustomTablePath("test-table-path4"),
Expand Down Expand Up @@ -124,14 +128,16 @@ class CachedTableManagerSuite extends SparkFunSuite {
Map("id1" -> "url1", "id2" -> "url2"),
Seq(new WeakReference(ref)),
provider,
() => {
_ => {
refreshTime += 1
(
Map("id1" -> ("url" + refreshTime.toString), "id2" -> "url4"),
Some(System.currentTimeMillis() + 1900)
Some(System.currentTimeMillis() + 1900),
None
)
},
System.currentTimeMillis() + 1900
System.currentTimeMillis() + 1900,
None
)
// We should refresh at least 5 times within 10 seconds based on
// (System.currentTimeMillis() + 1900).
Expand All @@ -148,14 +154,16 @@ class CachedTableManagerSuite extends SparkFunSuite {
Map("id1" -> "url1", "id2" -> "url2"),
Seq(new WeakReference(ref)),
provider,
() => {
_ => {
refreshTime2 += 1
(
Map("id1" -> ("url" + refreshTime2.toString), "id2" -> "url4"),
Some(System.currentTimeMillis() + 4900)
Some(System.currentTimeMillis() + 4900),
None
)
},
System.currentTimeMillis() + 4900
System.currentTimeMillis() + 4900,
None
)
// We should refresh 2 times within 10 seconds based on (System.currentTimeMillis() + 4900).
eventually(timeout(10.seconds)) {
Expand All @@ -171,14 +179,16 @@ class CachedTableManagerSuite extends SparkFunSuite {
Map("id1" -> "url1", "id2" -> "url2"),
Seq(new WeakReference(ref)),
provider,
() => {
_ => {
refreshTime3 += 1
(
Map("id1" -> ("url" + refreshTime3.toString), "id2" -> "url4"),
Some(System.currentTimeMillis() - 4900)
Some(System.currentTimeMillis() - 4900),
None
)
},
System.currentTimeMillis() + 6000
System.currentTimeMillis() + 6000,
None
)
// We should refresh 1 times within 10 seconds based on (preSignedUrlExpirationMs = 6000).
try {
Expand All @@ -197,6 +207,44 @@ class CachedTableManagerSuite extends SparkFunSuite {
}
}

test("refresh using refresh token") {
val manager = new CachedTableManager(
preSignedUrlExpirationMs = 10,
refreshCheckIntervalMs = 10,
refreshThresholdMs = 10,
expireAfterAccessMs = 60000
)
try {
val ref = new AnyRef
val provider = new TestDeltaSharingProfileProvider
manager.register(
"test-table-path",
Map("id1" -> "url1", "id2" -> "url2"),
Seq(new WeakReference(ref)),
provider,
refreshToken => {
if (refreshToken.contains("refresh-token-1")) {
(Map("id1" -> "url3", "id2" -> "url4"), None, Some("refresh-token-2"))
} else if (refreshToken.contains("refresh-token-2")) {
(Map("id1" -> "url5", "id2" -> "url6"), None, Some("refresh-token-2"))
} else {
fail("Expecting to refresh with a refresh token")
}
},
refreshToken = Some("refresh-token-1")
)
// We should get url5 and url6 eventually.
eventually(timeout(10.seconds)) {
assert(manager.getPreSignedUrl(provider.getCustomTablePath("test-table-path"),
"id1")._1 == "url5")
assert(manager.getPreSignedUrl(provider.getCustomTablePath("test-table-path"),
"id2")._1 == "url6")
}
} finally {
manager.stop()
}
}

test("expireAfterAccessMs") {
val manager = new CachedTableManager(
preSignedUrlExpirationMs = 10,
Expand All @@ -213,9 +261,10 @@ class CachedTableManagerSuite extends SparkFunSuite {
Map("id1" -> "url1", "id2" -> "url2"),
Seq(new WeakReference(ref)),
provider,
() => {
(Map("id1" -> "url1", "id2" -> "url2"), None)
})
_ => {
(Map("id1" -> "url1", "id2" -> "url2"), None, None)
},
refreshToken = None)
Thread.sleep(1000)
// We should remove the cached table when it's not accessed
intercept[IllegalStateException](manager.getPreSignedUrl(
Expand Down

0 comments on commit d49ed87

Please sign in to comment.