From e5d40ebc21b4b5414fe7a5dc60d61471c1065be2 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Wed, 8 May 2024 21:45:19 +0200 Subject: [PATCH] Add cc to the rest of jvm impl --- jvm/src/main/scala/PosixLikeIO/PIO.scala | 2 ++ .../examples/clientAndServerUDP.scala | 6 ++-- .../examples/readAndWriteFile.scala | 2 ++ .../PosixLikeIO/examples/readWholeFile.scala | 2 ++ jvm/src/main/scala/async/DefaultSupport.scala | 2 ++ .../scala/measurements/measureTimes.scala | 28 +++++++++++-------- shared/src/main/scala/async/Async.scala | 2 +- 7 files changed, 29 insertions(+), 15 deletions(-) diff --git a/jvm/src/main/scala/PosixLikeIO/PIO.scala b/jvm/src/main/scala/PosixLikeIO/PIO.scala index c61eca85..b5560eab 100644 --- a/jvm/src/main/scala/PosixLikeIO/PIO.scala +++ b/jvm/src/main/scala/PosixLikeIO/PIO.scala @@ -1,5 +1,7 @@ package PosixLikeIO +import language.experimental.captureChecking + import gears.async.Scheduler import gears.async.default.given import gears.async.{Async, Future} diff --git a/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala b/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala index a1c30b41..c4a8c5ea 100644 --- a/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala +++ b/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala @@ -1,5 +1,7 @@ package PosixLikeIO.examples +import language.experimental.captureChecking + import gears.async.AsyncOperations.* import gears.async.default.given import gears.async.{Async, Future} @@ -13,7 +15,7 @@ import PosixLikeIO.{PIOHelper, SocketUDP} @main def clientAndServerUDP(): Unit = given ExecutionContext = ExecutionContext.global - Async.blocking: + Async.blocking: spawn ?=> val server = Future: PIOHelper.withSocketUDP(8134): serverSocket => val got: DatagramPacket = serverSocket.receive().awaitResult.get @@ -22,7 +24,7 @@ import PosixLikeIO.{PIOHelper, SocketUDP} serverSocket.send(ByteBuffer.wrap(responseMessage), got.getAddress.toString.substring(1), got.getPort) sleep(50) - def client(value: Int): Future[Unit] = + def client(value: Int): Future[Unit]^{spawn} = Future: PIOHelper.withSocketUDP(): clientSocket => val data: Array[Byte] = value.toString.getBytes diff --git a/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala b/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala index a2bf39a9..6a43dd57 100644 --- a/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala +++ b/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala @@ -1,5 +1,7 @@ package PosixLikeIO.examples +import language.experimental.captureChecking + import gears.async.Async import gears.async.default.given diff --git a/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala b/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala index 7811122c..0294e67f 100644 --- a/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala +++ b/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala @@ -1,5 +1,7 @@ package PosixLikeIO.examples +import language.experimental.captureChecking + import gears.async.Async import gears.async.default.given diff --git a/jvm/src/main/scala/async/DefaultSupport.scala b/jvm/src/main/scala/async/DefaultSupport.scala index 7f12d2d4..7ce0f105 100644 --- a/jvm/src/main/scala/async/DefaultSupport.scala +++ b/jvm/src/main/scala/async/DefaultSupport.scala @@ -1,5 +1,7 @@ package gears.async.default +import language.experimental.captureChecking + import gears.async._ given AsyncOperations = JvmAsyncOperations diff --git a/jvm/src/main/scala/measurements/measureTimes.scala b/jvm/src/main/scala/measurements/measureTimes.scala index 1b3c4e6f..6724c7ae 100644 --- a/jvm/src/main/scala/measurements/measureTimes.scala +++ b/jvm/src/main/scala/measurements/measureTimes.scala @@ -1,5 +1,7 @@ package measurements +import language.experimental.captureChecking + import gears.async.default.given import gears.async.{Async, BufferedChannel, ChannelMultiplexer, Future, SyncChannel} @@ -65,9 +67,12 @@ def measureIterations[T](action: () => T): Int = val c1: Double = measureIterations: () => Async.blocking: - Async.race(Future { Thread.sleep(10) }, Future { Thread.sleep(100) }, Future { Thread.sleep(50) }).await - Async.race(Future { Thread.sleep(50) }, Future { Thread.sleep(10) }, Future { Thread.sleep(100) }).await - Async.race(Future { Thread.sleep(100) }, Future { Thread.sleep(50) }, Future { Thread.sleep(10) }).await + val r1 = Async.race(Future { Thread.sleep(10) }, Future { Thread.sleep(100) }, Future { Thread.sleep(50) }) + r1.await + val r2 = Async.race(Future { Thread.sleep(50) }, Future { Thread.sleep(10) }, Future { Thread.sleep(100) }) + r2.await + val r3 = Async.race(Future { Thread.sleep(100) }, Future { Thread.sleep(50) }, Future { Thread.sleep(10) }) + r3.await val c2: Double = measureIterations: () => Async.blocking: @@ -95,11 +100,11 @@ def measureIterations[T](action: () => T): Int = println("Non-raced futures per second: " + c2_per_second_adjusted) println("Overhead: " + (c2_per_second_adjusted / c1_per_second_adjusted)) - /* Linux +/* Linux Raced futures awaited per second: 15.590345727332032 Non-raced futures per second: 15.597976831457009 Overhead: 1.0004894762604013 - */ + */ @main def measureRaceOverheadVsJava(): Unit = given ExecutionContext = ExecutionContext.global @@ -148,11 +153,11 @@ def measureIterations[T](action: () => T): Int = println("Java threads awaited per second: " + c2_per_second_adjusted) println("Overhead: " + (c2_per_second_adjusted / c1_per_second_adjusted)) - /* Linux +/* Linux Raced futures awaited per second: 15.411487529449996 Java threads awaited per second: 15.671210243700953 Overhead: 1.0168525402726147 - */ + */ @main def channelsVsJava(): Unit = given ExecutionContext = ExecutionContext.global @@ -305,13 +310,13 @@ def measureIterations[T](action: () => T): Int = Thread.sleep(500) println("ChannelMultiplexer over BufferedChannels sends per second: " + cmOverBufferedSendsPerSecond) - /* Linux +/* Linux Java "channel" sends per second: 8691652 SyncChannel sends per second: 319371.0 BufferedChannel sends per second: 308286.0 ChannelMultiplexer over SyncChannels sends per second: 155737.0 ChannelMultiplexer over BufferedChannels sends per second: 151995.0 - */ + */ /** Warmup for 10 seconds and benchmark for 60 seconds. */ @@ -490,7 +495,7 @@ def measureRunTimes[T](action: () => T): TimeMeasurementResult = dataAlmostJson.append("}") println(dataAlmostJson.toString) - /* Linux +/* Linux { "File writing": { @@ -506,5 +511,4 @@ def measureRunTimes[T](action: () => T): TimeMeasurementResult = "Java Files.write": [18.96376850600001, 0.20493288428568684], }, }, - } - */ + } */ diff --git a/shared/src/main/scala/async/Async.scala b/shared/src/main/scala/async/Async.scala index c853c16a..40715a4c 100644 --- a/shared/src/main/scala/async/Async.scala +++ b/shared/src/main/scala/async/Async.scala @@ -191,7 +191,7 @@ object Async: * @see * [[Source!.awaitResult awaitResult]] for non-unwrapping await. */ - inline def await(using Async): T = src.awaitResult.get + def await(using Async): T = src.awaitResult.get extension [E, T](src: Source[Either[E, T]]^) /** Waits for an item to arrive from the source, then automatically unwraps it. Suspends until an item returns. * @see