Skip to content

Commit

Permalink
alsoTo - propagate error between sinks
Browse files Browse the repository at this point in the history
  • Loading branch information
micossow committed Jul 1, 2024
1 parent 380a647 commit 6afb0f2
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 8 deletions.
16 changes: 8 additions & 8 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1046,14 +1046,14 @@ trait SourceOps[+T] { outer: Source[T] =>
other.errorOrClosed(r)
false
case t: T @unchecked =>
if c2.sendOrClosed(t).isValue then
if other.sendOrClosed(t).isValue then true
else
c2.doneOrClosed().discard
false
else
other.doneOrClosed().discard
false
c2.sendOrClosed(t) match
case ChannelClosed.Done => other.doneOrClosed().discard; false
case ChannelClosed.Error(r) => other.errorOrClosed(r).discard; false
case _ =>
other.sendOrClosed(t) match
case ChannelClosed.Done => c2.doneOrClosed().discard; false
case ChannelClosed.Error(r) => c2.errorOrClosed(r).discard; false
case _ => true
}
}
c2
Expand Down
30 changes: 30 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsAlsoToTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

import scala.util.{Failure, Try}

class SourceOpsAlsoToTest extends AnyFlatSpec with Matchers {

behavior of "Source.alsoTo"
Expand Down Expand Up @@ -36,6 +38,19 @@ class SourceOpsAlsoToTest extends AnyFlatSpec with Matchers {
f.join() shouldBe List(1, 2, 3)
}

it should "close main channel with error when other errors" in supervised {
val c = Channel.withCapacity[Int](1)
val f = fork {
c.receiveOrClosed()
c.receiveOrClosed()
c.receiveOrClosed()
c.errorOrClosed(new RuntimeException("stop!"))
}

Try(Source.fromIterable(1 to 100).alsoTo(c).toList) shouldBe a[Failure[RuntimeException]]
f.join()
}

it should "close other channel when main closes" in supervised {
val other = Channel.rendezvous[Int]
val forkOther = fork {
Expand All @@ -50,4 +65,19 @@ class SourceOpsAlsoToTest extends AnyFlatSpec with Matchers {
// the channel's buffer is resized internally when it closes, see `com.softwaremill.jox.Channel.closeOrClosed`
forkOther.join().size should be < 25
}

it should "close other channel with error when main errors" in supervised {
val other = Channel.rendezvous[Int]
val forkOther = fork {
Try(other.toList)
}
val main = Source.fromIterable(1 to 100).alsoTo(other).asInstanceOf[Channel[Int]]

main.receiveOrClosed()
main.receiveOrClosed()
main.receiveOrClosed()
main.errorOrClosed(new RuntimeException("stop!"))

forkOther.join() shouldBe a[Failure[RuntimeException]]
}
}

0 comments on commit 6afb0f2

Please sign in to comment.