-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add virtual thread support #1299
Conversation
40af8d8
to
a25b239
Compare
actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala
Outdated
Show resolved
Hide resolved
actor/src/main/scala/org/apache/pekko/dispatch/NewVirtualThreadPerTaskExecutor.scala
Outdated
Show resolved
Hide resolved
actor/src/main/scala/org/apache/pekko/dispatch/NewVirtualThreadPerTaskExecutor.scala
Outdated
Show resolved
Hide resolved
This is probably useful but I would appreciate if we don't merge it until after 1.1.0-M1 is released. I think there are some changes that we need in the implementation and we have to work out how to test it. I don't think this dispatcher should be the default, even if you use JDK 21+. Users should only be able to opt in to using it. Dispatchers are pluggable so users can create their own dispatcher for this while they are waiting for us to release one. |
Yes, I would like to make it in 1.1.0 M2 or M3 , Need more time to polish it. |
actor/src/main/scala/org/apache/pekko/dispatch/NewVirtualThreadPerTaskExecutor.scala
Outdated
Show resolved
Hide resolved
actor/src/main/scala/org/apache/pekko/dispatch/NewVirtualThreadPerTaskExecutor.scala
Outdated
Show resolved
Hide resolved
@pjfanning @alexandru @Roiocam @alexandru What do you think about the current way to support virtual thread in pekko? |
var builder = ofVirtualMethod.invoke() | ||
val nameMethod = lookup.findVirtual(ofVirtualClass, "name", | ||
MethodType.methodType(ofVirtualClass, classOf[String], classOf[Long])) | ||
// TODO support replace scheduler when we drop Java 8 support |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (executor != null) {
val clazz = builder.getClass
val privateLookup = MethodHandles.privateLookupIn(
clazz,
lookup
)
val schedulerFieldSetter = privateLookup
.findSetter(clazz, "scheduler", classOf[Executor])
schedulerFieldSetter.invoke(builder, executor)
}
|
||
@InternalApi | ||
private[dispatch] object VirtualThreadSupport { | ||
private val lookup = MethodHandles.lookup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you use public lookup?
# then the executor configured in "fallback" will be used. | ||
virtual-thread-executor { | ||
#Please set the the underlying pool with system properties below: | ||
#jdk.virtualThreadScheduler.parallelism |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can these be supported as configs as opposed to relying on system properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but that will need to create a dedicated fork-join pool with a new factory for the CarrierThread
( but which lives in java.internal.misc`... we have that internal but not sure if that's a good idea for an opensource project.
Could you add a basic unit test? |
@He-Pin would you have time to add a test that runs when we build with java 21+? And there is my comment about using MethodHandles publicLookup instead of lookup. Generally, I support adding this. I'm hoping to try to get the final changes merged so that we can press ahead with Pekko 1.1.0 or 1.1.0-M2 release. |
I'm a little busy at work but I would like to address this this weekend night, at day I need go to the apache coc in HangZhou. |
#1436 was merged |
Motivation:
Add virtual thread support
Modification:
ThreadPerTaskExecutor
.Result:
Virtual thread support was added.
Note:
As
Mailbox
extends onForkJoinTask
and when we run onThreadPerTaskExecutor
, it's pointless.