diff --git a/core/src/main/scala/ox/channels/SourceOfSourceOps.scala b/core/src/main/scala/ox/channels/SourceOfSourceOps.scala deleted file mode 100644 index 7b98a7fc..00000000 --- a/core/src/main/scala/ox/channels/SourceOfSourceOps.scala +++ /dev/null @@ -1,40 +0,0 @@ -package ox.channels - -import ox.* -import ox.channels.* -import ox.channels.ChannelClosedUnion.isValue - -extension [U](parentSource: Source[Source[U]]) { - - /** Pipes the elements of child sources into the output source. If the parent source or any of the child sources emit an error, the - * pulling stops and the output source emits the error. - */ - def flatten(using Ox, StageCapacity): Source[U] = { - val c2 = StageCapacity.newChannel[U] - - forkPropagate(c2) { - var pool = List[Source[Source[U]] | Source[U]](parentSource) - repeatWhile { - selectOrClosed(pool) match { - case ChannelClosed.Done => - // TODO: best to remove the specific channel that signalled to be Done - pool = pool.filterNot(_.isClosedForReceiveDetail.contains(ChannelClosed.Done)) - if pool.isEmpty then - c2.doneOrClosed() - false - else true - case ChannelClosed.Error(e) => - c2.errorOrClosed(e) - false - case t: Source[U] @unchecked => - pool = t :: pool - true - case r: U @unchecked => - c2.sendOrClosed(r).isValue - } - } - } - - c2 - } -} diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index 2835a3e6..87e2395f 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -393,6 +393,38 @@ trait SourceOps[+T] { outer: Source[T] => } c + /** Pipes the elements of child sources into the output source. If the parent source or any of the child sources emit an error, the + * pulling stops and the output source emits the error. + */ + def flatten[U](using Ox, StageCapacity, T <:< Source[U]): Source[U] = { + val c2 = StageCapacity.newChannel[U] + + forkPropagate(c2) { + var pool = List[Source[T] | Source[U]](this) + repeatWhile { + selectOrClosed(pool) match { + case ChannelClosed.Done => + // TODO: best to remove the specific channel that signalled to be Done + pool = pool.filterNot(_.isClosedForReceiveDetail.contains(ChannelClosed.Done)) + if pool.isEmpty then + c2.doneOrClosed() + false + else true + case ChannelClosed.Error(e) => + c2.errorOrClosed(e) + false + case t: Source[T] @unchecked => + pool = t :: pool + true + case r: U @unchecked => + c2.sendOrClosed(r).isValue + } + } + } + + c2 + } + /** Concatenates this source with the `other` source. The resulting source will emit elements from this source first, and then from the * `other` source. * diff --git a/core/src/test/scala/ox/channels/SourceOfSourceOpsTest.scala b/core/src/test/scala/ox/channels/SourceOpsFlattenTest.scala similarity index 98% rename from core/src/test/scala/ox/channels/SourceOfSourceOpsTest.scala rename to core/src/test/scala/ox/channels/SourceOpsFlattenTest.scala index 5f69a3df..a77431c8 100644 --- a/core/src/test/scala/ox/channels/SourceOfSourceOpsTest.scala +++ b/core/src/test/scala/ox/channels/SourceOpsFlattenTest.scala @@ -8,7 +8,7 @@ import ox.* import java.util.concurrent.CountDownLatch import scala.collection.mutable.ListBuffer -class SourceOfSourceOpsTest extends AnyFlatSpec with Matchers with OptionValues { +class SourceOpsFlattenTest extends AnyFlatSpec with Matchers with OptionValues { "flatten" should "pipe all elements of the child sources into the output source" in { supervised {