From d49ed8774ac18b2dd0bde49c1f57024e46ecf82e Mon Sep 17 00:00:00 2001 From: Charlene Lyu Date: Fri, 1 Sep 2023 10:29:49 -0700 Subject: [PATCH] unit test --- .../sharing/CachedTableManagerSuite.scala | 97 ++++++++++++++----- 1 file changed, 73 insertions(+), 24 deletions(-) diff --git a/client/src/test/scala/org/apache/spark/delta/sharing/CachedTableManagerSuite.scala b/client/src/test/scala/org/apache/spark/delta/sharing/CachedTableManagerSuite.scala index 1d2793892..094c8fede 100644 --- a/client/src/test/scala/org/apache/spark/delta/sharing/CachedTableManagerSuite.scala +++ b/client/src/test/scala/org/apache/spark/delta/sharing/CachedTableManagerSuite.scala @@ -47,9 +47,10 @@ class CachedTableManagerSuite extends SparkFunSuite { Map("id1" -> "url1", "id2" -> "url2"), Seq(new WeakReference(ref)), provider, - () => { - (Map("id1" -> "url1", "id2" -> "url2"), None) - }) + _ => { + (Map("id1" -> "url1", "id2" -> "url2"), None, None) + }, + refreshToken = None) assert(manager.getPreSignedUrl(provider.getCustomTablePath("test-table-path"), "id1")._1 == "url1") assert(manager.getPreSignedUrl(provider.getCustomTablePath("test-table-path"), @@ -60,9 +61,10 @@ class CachedTableManagerSuite extends SparkFunSuite { Map("id1" -> "url1", "id2" -> "url2"), Seq(new WeakReference(ref)), provider, - () => { - (Map("id1" -> "url3", "id2" -> "url4"), None) - }) + _ => { + (Map("id1" -> "url3", "id2" -> "url4"), None, None) + }, + refreshToken = None) // We should get the new urls eventually eventually(timeout(10.seconds)) { assert(manager.getPreSignedUrl(provider.getCustomTablePath("test-table-path2"), @@ -76,9 +78,10 @@ class CachedTableManagerSuite extends SparkFunSuite { Map("id1" -> "url1", "id2" -> "url2"), Seq(new WeakReference(new AnyRef)), provider, - () => { - (Map("id1" -> "url3", "id2" -> "url4"), None) - }) + _ => { + (Map("id1" -> "url3", "id2" -> "url4"), None, None) + }, + refreshToken = None) // We should remove the cached table eventually eventually(timeout(10.seconds)) { System.gc() @@ -93,10 +96,11 @@ class CachedTableManagerSuite extends SparkFunSuite { Map("id1" -> "url1", "id2" -> "url2"), Seq(new WeakReference(ref)), provider, - () => { - (Map("id1" -> "url3", "id2" -> "url4"), None) + _ => { + (Map("id1" -> "url3", "id2" -> "url4"), None, None) }, - -1 + -1, + refreshToken = None ) // We should get new urls immediately because it's refreshed upon register assert(manager.getPreSignedUrl(provider.getCustomTablePath("test-table-path4"), @@ -124,14 +128,16 @@ class CachedTableManagerSuite extends SparkFunSuite { Map("id1" -> "url1", "id2" -> "url2"), Seq(new WeakReference(ref)), provider, - () => { + _ => { refreshTime += 1 ( Map("id1" -> ("url" + refreshTime.toString), "id2" -> "url4"), - Some(System.currentTimeMillis() + 1900) + Some(System.currentTimeMillis() + 1900), + None ) }, - System.currentTimeMillis() + 1900 + System.currentTimeMillis() + 1900, + None ) // We should refresh at least 5 times within 10 seconds based on // (System.currentTimeMillis() + 1900). @@ -148,14 +154,16 @@ class CachedTableManagerSuite extends SparkFunSuite { Map("id1" -> "url1", "id2" -> "url2"), Seq(new WeakReference(ref)), provider, - () => { + _ => { refreshTime2 += 1 ( Map("id1" -> ("url" + refreshTime2.toString), "id2" -> "url4"), - Some(System.currentTimeMillis() + 4900) + Some(System.currentTimeMillis() + 4900), + None ) }, - System.currentTimeMillis() + 4900 + System.currentTimeMillis() + 4900, + None ) // We should refresh 2 times within 10 seconds based on (System.currentTimeMillis() + 4900). eventually(timeout(10.seconds)) { @@ -171,14 +179,16 @@ class CachedTableManagerSuite extends SparkFunSuite { Map("id1" -> "url1", "id2" -> "url2"), Seq(new WeakReference(ref)), provider, - () => { + _ => { refreshTime3 += 1 ( Map("id1" -> ("url" + refreshTime3.toString), "id2" -> "url4"), - Some(System.currentTimeMillis() - 4900) + Some(System.currentTimeMillis() - 4900), + None ) }, - System.currentTimeMillis() + 6000 + System.currentTimeMillis() + 6000, + None ) // We should refresh 1 times within 10 seconds based on (preSignedUrlExpirationMs = 6000). try { @@ -197,6 +207,44 @@ class CachedTableManagerSuite extends SparkFunSuite { } } + test("refresh using refresh token") { + val manager = new CachedTableManager( + preSignedUrlExpirationMs = 10, + refreshCheckIntervalMs = 10, + refreshThresholdMs = 10, + expireAfterAccessMs = 60000 + ) + try { + val ref = new AnyRef + val provider = new TestDeltaSharingProfileProvider + manager.register( + "test-table-path", + Map("id1" -> "url1", "id2" -> "url2"), + Seq(new WeakReference(ref)), + provider, + refreshToken => { + if (refreshToken.contains("refresh-token-1")) { + (Map("id1" -> "url3", "id2" -> "url4"), None, Some("refresh-token-2")) + } else if (refreshToken.contains("refresh-token-2")) { + (Map("id1" -> "url5", "id2" -> "url6"), None, Some("refresh-token-2")) + } else { + fail("Expecting to refresh with a refresh token") + } + }, + refreshToken = Some("refresh-token-1") + ) + // We should get url5 and url6 eventually. + eventually(timeout(10.seconds)) { + assert(manager.getPreSignedUrl(provider.getCustomTablePath("test-table-path"), + "id1")._1 == "url5") + assert(manager.getPreSignedUrl(provider.getCustomTablePath("test-table-path"), + "id2")._1 == "url6") + } + } finally { + manager.stop() + } + } + test("expireAfterAccessMs") { val manager = new CachedTableManager( preSignedUrlExpirationMs = 10, @@ -213,9 +261,10 @@ class CachedTableManagerSuite extends SparkFunSuite { Map("id1" -> "url1", "id2" -> "url2"), Seq(new WeakReference(ref)), provider, - () => { - (Map("id1" -> "url1", "id2" -> "url2"), None) - }) + _ => { + (Map("id1" -> "url1", "id2" -> "url2"), None, None) + }, + refreshToken = None) Thread.sleep(1000) // We should remove the cached table when it's not accessed intercept[IllegalStateException](manager.getPreSignedUrl(