Skip to content

Commit

Permalink
Merge branch 'zio:series/2.x' into series/2.x
Browse files Browse the repository at this point in the history
  • Loading branch information
hearnadam authored Jul 30, 2024
2 parents 4a2aa5a + 1110174 commit e863a88
Show file tree
Hide file tree
Showing 24 changed files with 453 additions and 239 deletions.
8 changes: 0 additions & 8 deletions benchmarks/src/main/scala/zio/AsyncResumptionBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,3 @@ class AsyncResumptionBenchmark {
}

}

object AsyncResumptionBenchmark {
def main(args: Array[String]): Unit = {
val that = new AsyncResumptionBenchmark
while (true)
that.zioAsyncResumptionBenchmark
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import java.util.concurrent.TimeUnit
* small queue to enforce back pressure mechanism is used.
*/
class QueueBackPressureBenchmark {
val queueSize = 2
@Param(Array("2", "8", "16"))
var queueSize = 2
val totalSize = 1000
val parallelism = 5

Expand Down
22 changes: 11 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
.settings(stdSettings("zio"))
.settings(crossProjectSettings)
.settings(buildInfoSettings("zio"))
.settings(libraryDependencies += "dev.zio" %%% "izumi-reflect" % "2.3.8")
.settings(libraryDependencies += "dev.zio" %%% "izumi-reflect" % "2.3.9")
.enablePlugins(BuildInfoPlugin)
.settings(macroDefinitionSettings)
.settings(scalacOptions += "-Wconf:msg=[zio.stacktracer.TracingImplicits.disableAutoTrace]:silent")
Expand All @@ -235,7 +235,7 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
.nativeSettings(
nativeSettings,
libraryDependencies ++= Seq(
"com.github.lolgab" %%% "native-loop-core" % "0.2.1"
"com.github.lolgab" %%% "native-loop-core" % "0.3.0"
)
)

Expand Down Expand Up @@ -395,7 +395,7 @@ lazy val tests = crossProject(JSPlatform, JVMPlatform, NativePlatform)
.settings(macroExpansionSettings)
.settings(
libraryDependencies ++= Seq(
("org.portable-scala" %%% "portable-scala-reflect" % "1.1.2")
("org.portable-scala" %%% "portable-scala-reflect" % "1.1.3")
.cross(CrossVersion.for3Use2_13)
)
)
Expand All @@ -405,16 +405,16 @@ lazy val tests = crossProject(JSPlatform, JVMPlatform, NativePlatform)
.jsSettings(
jsSettings,
libraryDependencies ++= List(
"io.github.cquiroz" %%% "scala-java-time" % "2.5.0",
"io.github.cquiroz" %%% "scala-java-time-tzdb" % "2.5.0"
"io.github.cquiroz" %%% "scala-java-time" % "2.6.0",
"io.github.cquiroz" %%% "scala-java-time-tzdb" % "2.6.0"
)
)
.nativeSettings(
nativeSettings,
libraryDependencies ++= List(
"io.github.cquiroz" %%% "scala-java-time" % "2.5.0",
"io.github.cquiroz" %%% "scala-java-time-tzdb" % "2.5.0",
"com.github.lolgab" %%% "scala-native-crypto" % "0.0.4"
"io.github.cquiroz" %%% "scala-java-time" % "2.6.0",
"io.github.cquiroz" %%% "scala-java-time-tzdb" % "2.6.0",
"com.github.lolgab" %%% "scala-native-crypto" % "0.1.0"
)
)

Expand Down Expand Up @@ -499,7 +499,7 @@ lazy val testScalaCheck = crossProject(JSPlatform, JVMPlatform, NativePlatform)
.settings(crossProjectSettings)
.settings(
libraryDependencies ++= Seq(
("org.scalacheck" %%% "scalacheck" % "1.17.1")
("org.scalacheck" %%% "scalacheck" % "1.18.0")
)
)
.jsSettings(jsSettings)
Expand Down Expand Up @@ -716,7 +716,7 @@ lazy val scalafixRules = project.module
.settings(
scalafixSettings,
semanticdbEnabled := true, // enable SemanticDB
libraryDependencies += "ch.epfl.scala" %% "scalafix-core" % "0.10.4"
libraryDependencies += "ch.epfl.scala" %% "scalafix-core" % "0.12.1"
)

val zio1Version = "1.0.18"
Expand Down Expand Up @@ -744,7 +744,7 @@ lazy val scalafixTests = project
.settings(
scalafixSettings,
publish / skip := true,
libraryDependencies += "ch.epfl.scala" % "scalafix-testkit" % "0.12.0" % Test cross CrossVersion.full,
libraryDependencies += "ch.epfl.scala" % "scalafix-testkit" % "0.12.1" % Test cross CrossVersion.full,
Compile / compile :=
(Compile / compile).dependsOn(scalafixInput / Compile / compile).value,
scalafixTestkitOutputSourceDirectories :=
Expand Down
14 changes: 10 additions & 4 deletions core-tests/shared/src/test/scala/zio/QueueSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import zio.test.Assertion._
import zio.test.TestAspect.{jvm, nonFlaky}
import zio.test._

import scala.collection.immutable.Range

object QueueSpec extends ZIOBaseSpec {
import ZIOTag._

Expand Down Expand Up @@ -767,7 +765,7 @@ object QueueSpec extends ZIOBaseSpec {
_ <- ZIO.foreach(takers)(_.join)
} yield assertCompletes
}
},
} @@ jvm(nonFlaky(100)),
test("isEmpty") {
for {
queue <- Queue.bounded[Int](2)
Expand All @@ -783,7 +781,15 @@ object QueueSpec extends ZIOBaseSpec {
_ <- waitForSize(queue, 3)
full <- queue.isFull
} yield assertTrue(full)
}
},
test("bounded queue preserves ordering") {
for {
queue <- Queue.bounded[Int](16)
expected = Chunk.fromIterable(0 until 100)
_ <- queue.offerAll(expected).fork
actual <- queue.take.replicateZIO(100).map(Chunk.fromIterable)
} yield assertTrue(actual == expected)
} @@ jvm(nonFlaky(1000))
)
}

Expand Down
6 changes: 6 additions & 0 deletions core/js-native/src/main/scala/zio/QueuePlatformSpecific.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package zio

private[zio] trait QueuePlatformSpecific {
// java.util.concurrent.ConcurrentLinkedDeque is not available in ScalaJS or Scala Native, so we use an ArrayDeque instead
type ConcurrentDeque[A] = java.util.ArrayDeque[A]
}
5 changes: 5 additions & 0 deletions core/jvm/src/main/scala/zio/QueuePlatformSpecific.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package zio

private[zio] trait QueuePlatformSpecific {
type ConcurrentDeque[A] = java.util.concurrent.ConcurrentLinkedDeque[A]
}
39 changes: 17 additions & 22 deletions core/jvm/src/main/scala/zio/internal/ZScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ import scala.collection.mutable
* Lerche. [[https://tokio.rs/blog/2019-10-scheduler]]
*/
private final class ZScheduler(autoBlocking: Boolean) extends Executor {
import ZScheduler.workerOrNull

private[this] val poolSize = java.lang.Runtime.getRuntime.availableProcessors
import Trace.{empty => emptyTrace}
import ZScheduler.{poolSize, workerOrNull}

private[this] val globalQueue = new PartitionedLinkedQueue[Runnable](poolSize * 4)
private[this] val cache = new ConcurrentLinkedQueue[ZScheduler.Worker]()
private[this] val idle = new ConcurrentLinkedQueue[ZScheduler.Worker]()
private[this] val globalLocations = makeLocations()
private[this] val state = new AtomicInteger(poolSize << 16)
private[this] val workers = Array.ofDim[ZScheduler.Worker](poolSize)
private[this] val emptyTrace = Trace.empty

@volatile private[this] var blockingLocations: Set[Trace] = Set.empty

Expand Down Expand Up @@ -276,31 +276,21 @@ private final class ZScheduler(autoBlocking: Boolean) extends Executor {
var searching = false
while (!isInterrupted) {
currentBlocking = blocking
if (currentBlocking) {
if (nextRunnable ne null) {
runnable = nextRunnable
nextRunnable = null
}
val currentNextRunnable = nextRunnable
if (currentBlocking) ()
else if (currentNextRunnable ne null) {
runnable = currentNextRunnable
nextRunnable = null
} else {
if ((currentOpCount & 63) == 0) {
runnable = globalQueue.poll(random)
if (runnable eq null) {
if (nextRunnable ne null) {
runnable = nextRunnable
nextRunnable = null
} else {
runnable = localQueue.poll(null)
}
runnable = localQueue.poll(null)
}
} else {
if (nextRunnable ne null) {
runnable = nextRunnable
nextRunnable = null
} else {
runnable = localQueue.poll(null)
if (runnable eq null) {
runnable = globalQueue.poll(random)
}
runnable = localQueue.poll(null)
if (runnable eq null) {
runnable = globalQueue.poll(random)
}
}
if (runnable eq null) {
Expand Down Expand Up @@ -405,6 +395,10 @@ private final class ZScheduler(autoBlocking: Boolean) extends Executor {
val idx = workers.indexOf(self)
if (idx >= 0) {
val runnables = self.localQueue.pollUpTo(256)
if (nextRunnable ne null) {
globalQueue.offer(nextRunnable)
nextRunnable = null
}
globalQueue.offerAll(runnables)
val worker = cache.poll()
if (worker eq null) {
Expand Down Expand Up @@ -444,6 +438,7 @@ private final class ZScheduler(autoBlocking: Boolean) extends Executor {
}

private object ZScheduler {
private val poolSize = java.lang.Runtime.getRuntime.availableProcessors

def markCurrentWorkerAsBlocking(): Unit = {
val worker = workerOrNull()
Expand Down
106 changes: 105 additions & 1 deletion core/native/src/main/scala/zio/ClockPlatformSpecific.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@ import zio.{DurationSyntax => _}

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
import scala.scalanative.loop._

private[zio] trait ClockPlatformSpecific {
import ClockPlatformSpecific.Timer
private[zio] val globalScheduler = new Scheduler {
import Scheduler.CancelToken

private[this] val ConstTrue = () => false
private[this] val ConstFalse = () => false

override def schedule(task: Runnable, duration: Duration)(implicit unsafe: Unsafe): CancelToken =
(duration: @unchecked) match {
case zio.Duration.Zero =>
task.run()
ConstTrue
case zio.Duration.Infinity => ConstFalse
case zio.Duration.Finite(nanos) =>
var completed = false
Expand All @@ -47,3 +51,103 @@ private[zio] trait ClockPlatformSpecific {
}
}
}

/**
* Copy-pasted from scala.scalanative.loop
*
* This is temporary until we figure out why using it directly raises errors
*/
private object ClockPlatformSpecific {
import scala.collection.mutable
import scala.concurrent.Future
import scala.scalanative.annotation.alwaysinline
import scala.scalanative.libc.stdlib
import scala.scalanative.loop.EventLoop
import scala.scalanative.loop.LibUV._
import scala.scalanative.loop.LibUVConstants._
import scala.scalanative.runtime.Intrinsics._
import scala.scalanative.runtime._
import scala.scalanative.unsafe.Ptr

private object HandleUtils {
private val references = mutable.Map.empty[Object, Int]

@alwaysinline def getData[T <: Object](handle: Ptr[Byte]): T = {
// data is the first member of uv_loop_t
val ptrOfPtr = handle.asInstanceOf[Ptr[Ptr[Byte]]]
val dataPtr = !ptrOfPtr
if (dataPtr == null) null.asInstanceOf[T]
else {
val rawptr = toRawPtr(dataPtr)
castRawPtrToObject(rawptr).asInstanceOf[T]
}
}
@alwaysinline def setData(handle: Ptr[Byte], obj: Object): Unit = {
// data is the first member of uv_loop_t
val ptrOfPtr = handle.asInstanceOf[Ptr[Ptr[Byte]]]
if (obj != null) {
if (references.contains(obj)) references(obj) += 1
else references(obj) = 1
val rawptr = castObjectToRawPtr(obj)
!ptrOfPtr = fromRawPtr[Byte](rawptr)
} else {
!ptrOfPtr = null
}
}
private val onCloseCB: CloseCB = (handle: UVHandle) => {
stdlib.free(handle)
}

@alwaysinline def close(handle: Ptr[Byte]): Unit =
if (getData(handle) != null) {
uv_close(handle, onCloseCB)
val data = getData[Object](handle)
val current = references(data)
if (current > 1) references(data) -= 1
else references.remove(data)
setData(handle, null)
}
}

@alwaysinline final class Timer private (private val ptr: Ptr[Byte]) extends AnyVal {
def clear(): Unit = {
uv_timer_stop(ptr)
HandleUtils.close(ptr)
}
}

object Timer {
import scala.scalanative.loop.LibUV._
private val timeoutCB: TimerCB = (handle: TimerHandle) => {
val callback = HandleUtils.getData[() => Unit](handle)
callback.apply()
}

@alwaysinline private def startTimer(
timeout: Long,
repeat: Long,
callback: () => Unit
): Timer = {
val timerHandle = stdlib.malloc(uv_handle_size(UV_TIMER_T))
uv_timer_init(EventLoop.loop, timerHandle)
HandleUtils.setData(timerHandle, callback)
val timer = new Timer(timerHandle)
uv_timer_start(timerHandle, timeoutCB, timeout, repeat)
timer
}

def delay(duration: FiniteDuration): Future[Unit] = {
val promise = scala.concurrent.Promise[Unit]()
timeout(duration)(() => promise.success(()))
promise.future
}

def timeout(duration: FiniteDuration)(callback: () => Unit): Timer =
startTimer(duration.toMillis, 0L, callback)

def repeat(duration: FiniteDuration)(callback: () => Unit): Timer = {
val millis = duration.toMillis
startTimer(millis, millis, callback)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private[zio] object CleanCodePrinter {

def loop(expr: c.Tree): c.Tree =
expr match {
case Apply(t, args) if args.exists(t => t.tpe <:< tracerType || t.tpe <:< tagType) =>
case Apply(t, args) if args.exists(t => t.tpe != null && (t.tpe <:< tracerType || t.tpe <:< tagType)) =>
loop(t)
case Apply(t, args) =>
Apply(loop(t), args.map(t => loop(t)))
Expand Down
Loading

0 comments on commit e863a88

Please sign in to comment.