From f3fa30115ee374b83867cae9f37e4a2d54fff7bd Mon Sep 17 00:00:00 2001 From: David Rogers Date: Fri, 24 Mar 2023 21:29:27 -0500 Subject: [PATCH 1/3] support synchronous `put` operation requires that cf code pass a `{...,synchronous:true}` param in its config to the plugin --- .../io/cache/redis/NearCacheEntry.java | 15 +++++ .../extension/io/cache/redis/RedisCache.java | 55 +++++++++++++++++-- 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/source/java/src/lucee/extension/io/cache/redis/NearCacheEntry.java b/source/java/src/lucee/extension/io/cache/redis/NearCacheEntry.java index c1066c0..b87c587 100644 --- a/source/java/src/lucee/extension/io/cache/redis/NearCacheEntry.java +++ b/source/java/src/lucee/extension/io/cache/redis/NearCacheEntry.java @@ -17,12 +17,27 @@ public class NearCacheEntry implements CacheEntry { private byte[] serialized; private long count; + /** + * Some optional callback to be performed immediately after the write to cache is complete. + * Can be null, meaning "so such action". + */ + public final Runnable onComplete; + public NearCacheEntry(byte[] key, Object val, int exp, long count) { this.key = key; this.val = val; this.exp = exp; this.created = System.currentTimeMillis(); this.count = count; + this.onComplete = null; + } + public NearCacheEntry(byte[] key, Object val, int exp, long count, Runnable onComplete) { + this.key = key; + this.val = val; + this.exp = exp; + this.created = System.currentTimeMillis(); + this.count = count; + this.onComplete = onComplete; } @Override diff --git a/source/java/src/lucee/extension/io/cache/redis/RedisCache.java b/source/java/src/lucee/extension/io/cache/redis/RedisCache.java index a0136df..a8ba15a 100644 --- a/source/java/src/lucee/extension/io/cache/redis/RedisCache.java +++ b/source/java/src/lucee/extension/io/cache/redis/RedisCache.java @@ -62,7 +62,7 @@ public class RedisCache extends CacheSupport implements Command { private int port; private String username; - private final boolean async = true; + private boolean async = true; private Storage storage = new Storage(this); @@ -144,6 +144,15 @@ public void init(Config config, Struct arguments) throws IOException { defaultExpire = caster.toIntValue(arguments.get("timeToLiveSeconds", null), 0); databaseIndex = caster.toIntValue(arguments.get("databaseIndex", null), -1); + + // + // "default" behavior is async + // If config's "synchronous" value is missing or is explicitly false, we'll get async=true; + // otherwise, "synchronous" was passed and was cftruthy, and we'll get async=false + // + final boolean synchronous = caster.toBooleanValue(arguments.get("synchronous", null), false); + async = !synchronous; + String logName = caster.toString(arguments.get("log", null), null); if (!Util.isEmpty(logName, true) && config != null) { logName = logName.trim(); @@ -938,10 +947,43 @@ public void doJoin(long count, boolean one) { } } - public void put(byte[] bkey, Object val, int exp, long count) { - entries.add(new NearCacheEntry(bkey, val, exp, count)); - synchronized (tokenAddToNear) { - tokenAddToNear.notifyAll(); + /** + * If the extension is configured for asynchronous behavior (the default) + * then this method can return prior to the write to cache completing. + * In synchronous mode, we block until the write completes. + */ + public void put(byte[] bkey, Object val, int exp, long count) throws IOException { + if (cache.async) { + entries.add(new NearCacheEntry(bkey, val, exp, count)); + synchronized (tokenAddToNear) { + tokenAddToNear.notifyAll(); + } + } + else { + final Object sync = new Object(); + entries.add( + new NearCacheEntry( + bkey, val, exp, count, + new Runnable() { + public void run() { + synchronized(sync) { + sync.notifyAll(); + } + } + } + ) + ); + synchronized (tokenAddToNear) { + synchronized (sync) { + tokenAddToNear.notifyAll(); // wake up redis worker thread + try { + sync.wait(5000/*ms*/); // wait for worker thread to say "ok it's in cache" + } + catch (InterruptedException e) { + throw this.engine.getExceptionUtil().toIOException(e); + } + } + } } } @@ -955,6 +997,9 @@ public void run() { cache.put(entry.getByteKey(), entry.getValue(), entry.getExpires()); synchronized (tokenAddToCache) { tokenAddToCache.notifyAll(); + if (entry.onComplete != null) { + entry.onComplete.run(); + } } } synchronized (tokenAddToNear) { From 4cdba08541ab8c4bf46e66d4be9d8eb6464d7f62 Mon Sep 17 00:00:00 2001 From: David Rogers Date: Fri, 24 Mar 2023 21:49:51 -0500 Subject: [PATCH 2/3] more explicit name for config param --- .../java/src/lucee/extension/io/cache/redis/RedisCache.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/java/src/lucee/extension/io/cache/redis/RedisCache.java b/source/java/src/lucee/extension/io/cache/redis/RedisCache.java index a8ba15a..d595dbb 100644 --- a/source/java/src/lucee/extension/io/cache/redis/RedisCache.java +++ b/source/java/src/lucee/extension/io/cache/redis/RedisCache.java @@ -147,10 +147,10 @@ public void init(Config config, Struct arguments) throws IOException { // // "default" behavior is async - // If config's "synchronous" value is missing or is explicitly false, we'll get async=true; - // otherwise, "synchronous" was passed and was cftruthy, and we'll get async=false + // If config's "synchronousPut" value is missing or is explicitly false, we'll get async=true; + // otherwise, "synchronousPut" was passed and was cftruthy, and we'll get async=false // - final boolean synchronous = caster.toBooleanValue(arguments.get("synchronous", null), false); + final boolean synchronous = caster.toBooleanValue(arguments.get("synchronousPut", null), false); async = !synchronous; String logName = caster.toString(arguments.get("log", null), null); From db968deecacdbd3db67f1c2a6f7f4faf9ab675f8 Mon Sep 17 00:00:00 2001 From: David Rogers Date: Sat, 25 Mar 2023 08:41:15 -0500 Subject: [PATCH 3/3] comment tidy --- .../java/src/lucee/extension/io/cache/redis/NearCacheEntry.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/java/src/lucee/extension/io/cache/redis/NearCacheEntry.java b/source/java/src/lucee/extension/io/cache/redis/NearCacheEntry.java index b87c587..f4e0a4a 100644 --- a/source/java/src/lucee/extension/io/cache/redis/NearCacheEntry.java +++ b/source/java/src/lucee/extension/io/cache/redis/NearCacheEntry.java @@ -19,7 +19,7 @@ public class NearCacheEntry implements CacheEntry { /** * Some optional callback to be performed immediately after the write to cache is complete. - * Can be null, meaning "so such action". + * Can be null, meaning "there is no callback to run". */ public final Runnable onComplete;