Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for virtual thread support #1436

Merged
merged 6 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.dispatch

import com.typesafe.config.ConfigFactory

import org.apache.pekko
import pekko.actor.{ Actor, Props }
import pekko.testkit.{ ImplicitSender, PekkoSpec }

object VirtualThreadPoolDispatcherSpec {
val config = ConfigFactory.parseString("""
|virtual-thread-dispatcher {
| executor = virtual-thread-executor
|}
""".stripMargin)

class InnocentActor extends Actor {

override def receive = {
case "ping" =>
sender() ! "All fine"
}
}

}

class VirtualThreadPoolDispatcherSpec extends PekkoSpec(VirtualThreadPoolDispatcherSpec.config) with ImplicitSender {
import VirtualThreadPoolDispatcherSpec._

val Iterations = 1000

"VirtualThreadPool support" must {

"handle simple dispatch" in {
val innocentActor = system.actorOf(Props(new InnocentActor).withDispatcher("virtual-thread-dispatcher"))
innocentActor ! "ping"
expectMsg("All fine")
}

}
}
14 changes: 14 additions & 0 deletions actor/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ pekko {
# Valid options:
# - "default-executor" requires a "default-executor" section
# - "fork-join-executor" requires a "fork-join-executor" section
# - "virtual-thread-executor" requires a "virtual-thread-executor" section
# - "thread-pool-executor" requires a "thread-pool-executor" section
# - "affinity-pool-executor" requires an "affinity-pool-executor" section
# - A FQCN of a class extending ExecutorServiceConfigurator
Expand Down Expand Up @@ -539,6 +540,19 @@ pekko {
allow-core-timeout = on
}

# This will be used if you have set "executor = "virtual-thread-executor"
# This executor will execute the every task on a new virtual thread.
# Underlying thread pool implementation is java.util.concurrent.ForkJoinPool for JDK <= 22
# If the current runtime does not support virtual thread,
# then the executor configured in "fallback" will be used.
virtual-thread-executor {
#Please set the the underlying pool with system properties below:
#jdk.virtualThreadScheduler.parallelism
#jdk.virtualThreadScheduler.maxPoolSize
#jdk.virtualThreadScheduler.minRunnable
#jdk.unparker.maxPoolSize
fallback = "fork-join-executor"
}
# How long time the dispatcher will wait for new actors until it shuts down
shutdown-timeout = 1s

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@ package org.apache.pekko.dispatch
import java.{ util => ju }
import java.util.concurrent._

import scala.annotation.tailrec
import scala.annotation.{ nowarn, tailrec }
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor }
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.util.control.NonFatal

import scala.annotation.nowarn
import com.typesafe.config.Config

import org.apache.pekko
import pekko.actor._
import pekko.annotation.InternalStableApi
Expand All @@ -33,6 +30,8 @@ import pekko.event.EventStream
import pekko.event.Logging.{ Debug, Error, LogEventException }
import pekko.util.{ unused, Index, Unsafe }

import com.typesafe.config.Config

final case class Envelope private (message: Any, sender: ActorRef) {

def copy(message: Any = message, sender: ActorRef = sender) = {
Expand Down Expand Up @@ -367,9 +366,16 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites:
def dispatcher(): MessageDispatcher

def configureExecutor(): ExecutorServiceConfigurator = {
@tailrec
def configurator(executor: String): ExecutorServiceConfigurator = executor match {
case null | "" | "fork-join-executor" =>
new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
case "virtual-thread-executor" =>
if (VirtualThreadSupport.isSupported) {
new VirtualThreadExecutorConfigurator(config.getConfig("virtual-thread-executor"), prerequisites)
} else {
configurator(config.getString("virtual-thread-executor.fallback"))
}
case "thread-pool-executor" =>
new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
case "affinity-pool-executor" =>
Expand Down Expand Up @@ -401,6 +407,32 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites:
}
}

final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends ExecutorServiceConfigurator(config, prerequisites) {

override def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
import VirtualThreadSupport._
val tf: ThreadFactory = threadFactory match {
case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) =>
new ThreadFactory {
private val vtFactory = newVirtualThreadFactory(name)

override def newThread(r: Runnable): Thread = {
val vt = vtFactory.newThread(r)
vt.setUncaughtExceptionHandler(exceptionHandler)
contextClassLoader.foreach(vt.setContextClassLoader)
vt
}
}
case _ => VirtualThreadSupport.newVirtualThreadFactory(prerequisites.settings.name);
}
new ExecutorServiceFactory {
import VirtualThreadSupport._
override def createExecutorService: ExecutorService = newThreadPerTaskExecutor(tf)
}
}
}

class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends ExecutorServiceConfigurator(config, prerequisites) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.dispatch

import org.apache.pekko.annotation.InternalApi
import org.apache.pekko.util.JavaVersion

import java.lang.invoke.{ MethodHandles, MethodType }
import java.util.concurrent.{ ExecutorService, ThreadFactory }
import scala.util.control.NonFatal

@InternalApi
private[dispatch] object VirtualThreadSupport {
private val lookup = MethodHandles.publicLookup()

/**
* Is virtual thread supported
*/
val isSupported: Boolean = JavaVersion.majorVersion >= 21

/**
* Create a virtual thread factory with a executor, the executor will be used as the scheduler of
* virtual thread.
*/
def newVirtualThreadFactory(prefix: String): ThreadFactory = {
require(isSupported, "Virtual thread is not supported.")
try {
val builderClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder")
val ofVirtualClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder$OfVirtual")
val ofVirtualMethod = lookup.findStatic(classOf[Thread], "ofVirtual", MethodType.methodType(ofVirtualClass))
var builder = ofVirtualMethod.invoke()
val nameMethod = lookup.findVirtual(ofVirtualClass, "name",
MethodType.methodType(ofVirtualClass, classOf[String], classOf[Long]))
// TODO support replace scheduler when we drop Java 8 support
val factoryMethod = lookup.findVirtual(builderClass, "factory", MethodType.methodType(classOf[ThreadFactory]))
builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", 0L)
factoryMethod.invoke(builder).asInstanceOf[ThreadFactory]
} catch {
case NonFatal(e) =>
// --add-opens java.base/java.lang=ALL-UNNAMED
throw new UnsupportedOperationException("Failed to create virtual thread factory", e)
}
}

def newThreadPerTaskExecutor(threadFactory: ThreadFactory): ExecutorService = {
require(threadFactory != null, "threadFactory should not be null.")
try {
val executorsClazz = ClassLoader.getSystemClassLoader.loadClass("java.util.concurrent.Executors")
val newThreadPerTaskExecutorMethod = lookup.findStatic(
executorsClazz,
"newThreadPerTaskExecutor",
MethodType.methodType(classOf[ExecutorService], classOf[ThreadFactory]))
newThreadPerTaskExecutorMethod.invoke(threadFactory).asInstanceOf[ExecutorService]
} catch {
case NonFatal(e) =>
throw new UnsupportedOperationException("Failed to create newThreadPerTaskExecutor.", e)
}
}

}
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,10 @@ lazy val actor = pekkoModule("actor")
.enablePlugins(BoilerplatePlugin, SbtOsgi, Jdk9)

lazy val actorTests = pekkoModule("actor-tests")
.configs(Jdk9.TestJdk9)
.dependsOn(testkit % "compile->compile;test->test", actor)
.settings(Dependencies.actorTests)
.enablePlugins(NoPublish)
.enablePlugins(NoPublish, Jdk9)
.disablePlugins(MimaPlugin)

lazy val pekkoScalaNightly = pekkoModule("scala-nightly")
Expand Down
Loading