From 65a9c90cc6c70dbcf7cbec69168b28bcb36e6cd8 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 27 Apr 2024 17:49:18 +0800 Subject: [PATCH 1/6] feat: Add virtual thread support --- actor/src/main/resources/reference.conf | 14 ++++ .../pekko/dispatch/AbstractDispatcher.scala | 40 +++++++++- .../pekko/dispatch/VirtualThreadSupport.scala | 75 +++++++++++++++++++ 3 files changed, 125 insertions(+), 4 deletions(-) create mode 100644 actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala diff --git a/actor/src/main/resources/reference.conf b/actor/src/main/resources/reference.conf index 9459cda1418..7ef70b85606 100644 --- a/actor/src/main/resources/reference.conf +++ b/actor/src/main/resources/reference.conf @@ -376,6 +376,7 @@ pekko { # Valid options: # - "default-executor" requires a "default-executor" section # - "fork-join-executor" requires a "fork-join-executor" section + # - "virtual-thread-executor" requires a "virtual-thread-executor" section # - "thread-pool-executor" requires a "thread-pool-executor" section # - "affinity-pool-executor" requires an "affinity-pool-executor" section # - A FQCN of a class extending ExecutorServiceConfigurator @@ -539,6 +540,19 @@ pekko { allow-core-timeout = on } + # This will be used if you have set "executor = "virtual-thread-executor" + # This executor will execute the every task on a new virtual thread. + # Underlying thread pool implementation is java.util.concurrent.ForkJoinPool for JDK <= 22 + # If the current runtime does not support virtual thread, + # then the executor configured in "fallback" will be used. + virtual-thread-executor { + #Please set the the underlying pool with system properties below: + #jdk.virtualThreadScheduler.parallelism + #jdk.virtualThreadScheduler.maxPoolSize + #jdk.virtualThreadScheduler.minRunnable + #jdk.unparker.maxPoolSize + fallback = "fork-join-executor" + } # How long time the dispatcher will wait for new actors until it shuts down shutdown-timeout = 1s diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala index f4fb6a73c48..f163f01960d 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala @@ -16,14 +16,11 @@ package org.apache.pekko.dispatch import java.{ util => ju } import java.util.concurrent._ -import scala.annotation.tailrec +import scala.annotation.{ nowarn, tailrec } import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor } import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.util.control.NonFatal -import scala.annotation.nowarn -import com.typesafe.config.Config - import org.apache.pekko import pekko.actor._ import pekko.annotation.InternalStableApi @@ -33,6 +30,8 @@ import pekko.event.EventStream import pekko.event.Logging.{ Debug, Error, LogEventException } import pekko.util.{ unused, Index, Unsafe } +import com.typesafe.config.Config + final case class Envelope private (message: Any, sender: ActorRef) { def copy(message: Any = message, sender: ActorRef = sender) = { @@ -367,9 +366,16 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: def dispatcher(): MessageDispatcher def configureExecutor(): ExecutorServiceConfigurator = { + @tailrec def configurator(executor: String): ExecutorServiceConfigurator = executor match { case null | "" | "fork-join-executor" => new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) + case "virtual-thread-executor" => + if (VirtualThreadSupport.isSupported) { + new VirtualThreadExecutorConfigurator(config.getConfig("virtual-thread-executor"), prerequisites) + } else { + configurator(config.getString("virtual-thread-executor.fallback")) + } case "thread-pool-executor" => new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) case "affinity-pool-executor" => @@ -401,6 +407,32 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: } } +final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends ExecutorServiceConfigurator(config, prerequisites) { + + override def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { + import VirtualThreadSupport._ + val tf: ThreadFactory = threadFactory match { + case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) => + new ThreadFactory { + private val vtFactory = newVirtualThreadFactory(name) + + override def newThread(r: Runnable): Thread = { + val vt = vtFactory.newThread(r) + vt.setUncaughtExceptionHandler(exceptionHandler) + contextClassLoader.foreach(vt.setContextClassLoader) + vt + } + } + case _ => VirtualThreadSupport.newVirtualThreadFactory(prerequisites.settings.name); + } + new ExecutorServiceFactory { + import VirtualThreadSupport._ + override def createExecutorService: ExecutorService = newThreadPerTaskExecutor(tf) + } + } +} + class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) { diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala new file mode 100644 index 00000000000..ae9471e59bd --- /dev/null +++ b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.dispatch + +import org.apache.pekko.annotation.InternalApi +import org.apache.pekko.util.JavaVersion + +import java.lang.invoke.{ MethodHandles, MethodType } +import java.util.concurrent.{ ExecutorService, ThreadFactory } +import scala.util.control.NonFatal + +@InternalApi +private[dispatch] object VirtualThreadSupport { + private val lookup = MethodHandles.lookup + + /** + * Is virtual thread supported + */ + val isSupported: Boolean = JavaVersion.majorVersion >= 21 + + /** + * Create a virtual thread factory with a executor, the executor will be used as the scheduler of + * virtual thread. + */ + def newVirtualThreadFactory(prefix: String): ThreadFactory = { + require(isSupported, "Virtual thread is not supported.") + try { + val builderClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder") + val ofVirtualClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder$OfVirtual") + val ofVirtualMethod = lookup.findStatic(classOf[Thread], "ofVirtual", MethodType.methodType(ofVirtualClass)) + var builder = ofVirtualMethod.invoke() + val nameMethod = lookup.findVirtual(ofVirtualClass, "name", + MethodType.methodType(ofVirtualClass, classOf[String], classOf[Long])) + // TODO support replace scheduler when we drop Java 8 support + val factoryMethod = lookup.findVirtual(builderClass, "factory", MethodType.methodType(classOf[ThreadFactory])) + builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", 0L) + factoryMethod.invoke(builder).asInstanceOf[ThreadFactory] + } catch { + case NonFatal(e) => + // --add-opens java.base/java.lang=ALL-UNNAMED + throw new UnsupportedOperationException("Failed to create virtual thread factory", e) + } + } + + def newThreadPerTaskExecutor(threadFactory: ThreadFactory): ExecutorService = { + require(threadFactory != null, "threadFactory should not be null.") + try { + val executorsClazz = ClassLoader.getSystemClassLoader.loadClass("java.util.concurrent.Executors") + val newThreadPerTaskExecutorMethod = lookup.findStatic( + executorsClazz, + "newThreadPerTaskExecutor", + MethodType.methodType(classOf[ExecutorService], classOf[ThreadFactory])) + newThreadPerTaskExecutorMethod.invoke(threadFactory).asInstanceOf[ExecutorService] + } catch { + case NonFatal(e) => + throw new UnsupportedOperationException("Failed to create newThreadPerTaskExecutor.", e) + } + } + +} From 4e2cc31daea42bcee71eaa8a93cfda698bde0a3e Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 8 Aug 2024 20:15:06 +0100 Subject: [PATCH 2/6] use public lookup --- .../scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala index ae9471e59bd..3a777891559 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala @@ -26,7 +26,7 @@ import scala.util.control.NonFatal @InternalApi private[dispatch] object VirtualThreadSupport { - private val lookup = MethodHandles.lookup + private val lookup = MethodHandles.publicLookup() /** * Is virtual thread supported From c541ec11440e5cc0bb93642f3e8a5e7f3e5f8d95 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 8 Aug 2024 21:41:44 +0100 Subject: [PATCH 3/6] try to add test --- .../VirtualThreadPoolDispatcherSpec.scala | 62 +++++++++++++++++++ build.sbt | 3 +- 2 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala new file mode 100644 index 00000000000..5af2bd48c86 --- /dev/null +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.dispatch + +import com.typesafe.config.ConfigFactory + +import org.apache.pekko +import pekko.actor.{ Actor, Props } +import pekko.testkit.PekkoSpec + +object VirtualThreadPoolDispatcherSpec { + val config = ConfigFactory.parseString(""" + |virtual-thread-pool { + | task-dispatcher { + | throughput = 5 + | thread-pool-executor { + | + | } + | } + |} + """.stripMargin) + + class InnocentActor extends Actor { + + override def receive = { + case "ping" => + sender() ! "All fine" + } + } + +} + +class VirtualThreadPoolDispatcherSpec extends PekkoSpec(VirtualThreadPoolDispatcherSpec.config) with ImplicitSender { + import VirtualThreadPoolDispatcherSpec._ + + val Iterations = 1000 + + "VirtualThreadPool support" must { + + "handle simple dispatch" in { + val innocentActor = system.actorOf(Props(new InnocentActor).withDispatcher("virtual-thread-pool.task-dispatcher")) + innocentActor ! "ping" + expectMsg("All fine") + } + + } +} diff --git a/build.sbt b/build.sbt index a57f790104d..d93f55ee67c 100644 --- a/build.sbt +++ b/build.sbt @@ -124,9 +124,10 @@ lazy val actor = pekkoModule("actor") .enablePlugins(BoilerplatePlugin, SbtOsgi, Jdk9) lazy val actorTests = pekkoModule("actor-tests") + .configs(Jdk9.TestJdk9) .dependsOn(testkit % "compile->compile;test->test", actor) .settings(Dependencies.actorTests) - .enablePlugins(NoPublish) + .enablePlugins(NoPublish, Jdk9) .disablePlugins(MimaPlugin) lazy val pekkoScalaNightly = pekkoModule("scala-nightly") From ec85fdbbdfc58c8c08de37c59243ea16d2a9d3e1 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 8 Aug 2024 22:07:08 +0100 Subject: [PATCH 4/6] Update VirtualThreadPoolDispatcherSpec.scala --- .../apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala index 5af2bd48c86..12b73767396 100644 --- a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala @@ -21,7 +21,7 @@ import com.typesafe.config.ConfigFactory import org.apache.pekko import pekko.actor.{ Actor, Props } -import pekko.testkit.PekkoSpec +import pekko.testkit.{ ImplicitSender, PekkoSpec } object VirtualThreadPoolDispatcherSpec { val config = ConfigFactory.parseString(""" From 23a7b8b7eeb4e5d7d978a4e8348f6523c8118f17 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Fri, 9 Aug 2024 18:04:48 +0100 Subject: [PATCH 5/6] Update VirtualThreadPoolDispatcherSpec.scala --- .../pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala index 12b73767396..fe8da9cb58e 100644 --- a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala @@ -27,10 +27,7 @@ object VirtualThreadPoolDispatcherSpec { val config = ConfigFactory.parseString(""" |virtual-thread-pool { | task-dispatcher { - | throughput = 5 - | thread-pool-executor { - | - | } + | executor = virtual-thread-executor | } |} """.stripMargin) From d083bbadb8afdd22c030ad985fa6269d0aaa5acd Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Wed, 14 Aug 2024 17:42:46 +0100 Subject: [PATCH 6/6] Apply suggestions from code review Co-authored-by: Andy(Jingzhang)Chen --- .../pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala index fe8da9cb58e..739bf26a5e3 100644 --- a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala @@ -25,10 +25,8 @@ import pekko.testkit.{ ImplicitSender, PekkoSpec } object VirtualThreadPoolDispatcherSpec { val config = ConfigFactory.parseString(""" - |virtual-thread-pool { - | task-dispatcher { - | executor = virtual-thread-executor - | } + |virtual-thread-dispatcher { + | executor = virtual-thread-executor |} """.stripMargin) @@ -50,7 +48,7 @@ class VirtualThreadPoolDispatcherSpec extends PekkoSpec(VirtualThreadPoolDispatc "VirtualThreadPool support" must { "handle simple dispatch" in { - val innocentActor = system.actorOf(Props(new InnocentActor).withDispatcher("virtual-thread-pool.task-dispatcher")) + val innocentActor = system.actorOf(Props(new InnocentActor).withDispatcher("virtual-thread-dispatcher")) innocentActor ! "ping" expectMsg("All fine") }