Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tap and tapAsView operators and document how to use them for logging #170

Merged
merged 3 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ trait SourceOps[+T] { outer: Source[T] =>
override val delegate: JSource[Any] = outer.delegate.asInstanceOf[JSource[T]].collectAsView(t => f(t))
}

/** Lazily-evaluated tap: creates a view of this source, where the results of [[receive]] will be applied to the given function `f` on the
* consumer's thread. Useful for side-effects without result values, like logging and debugging. For an eager, asynchronous version, see
* [[tap]].
*
* The same logic applies to receive clauses created using this source, which can be used in [[select]].
*
* @param f
* The consumer function.
* @return
* A source which is a view of this source, with the consumer function applied.
*/
def tapAsView(f: T => Unit): Source[T] = mapAsView(t => { f(t); t })

/** Lazily-evaluated filter: Creates a view of this source, where the results of [[receive]] will be filtered on the consumer's thread
* using the given predicate `p`. For an eager, asynchronous version, see [[filter]].
*
Expand Down Expand Up @@ -70,7 +83,7 @@ trait SourceOps[+T] { outer: Source[T] =>
/** Applies the given mapping function `f` to each element received from this source, and sends the results to the returned channel.
*
* Errors from this channel are propagated to the returned channel. Any exceptions that occur when invoking `f` are propagated as errors
* to the returned channel as wel.
* to the returned channel as well.
*
* Must be run within a scope, as a child fork is created, which receives from this source and sends the mapped values to the resulting
* one.
Expand Down Expand Up @@ -101,6 +114,23 @@ trait SourceOps[+T] { outer: Source[T] =>
}
c2

/** Applies the given consumer function `f` to each element received from this source.
*
* Errors from this channel are propagated to the returned channel. Any exceptions that occur when invoking `f` are propagated as errors
* to the returned channel as well.
*
* Must be run within a scope, as a child fork is created, which receives from this source and sends the mapped values to the resulting
* one.
*
* Useful for side-effects without result values, like logging and debugging. For a lazily-evaluated version, see [[tapAsView]].
*
* @param f
* The consumer function.
* @return
* A source, which the elements from the input source are passed to.
*/
def tap(f: T => Unit)(using Ox, StageCapacity): Source[T] = map(t => { f(t); t })

/** Intersperses this source with provided element and forwards it to the returned channel.
*
* @param inject
Expand Down
22 changes: 22 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsAsViewTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

import java.util.concurrent.atomic.AtomicInteger
import scala.jdk.CollectionConverters.*
import scala.util.{Failure, Try}

Expand Down Expand Up @@ -111,6 +112,27 @@ class SourceOpsAsViewTest extends AnyFlatSpec with Matchers with Eventually {
}
}

it should "tap over a source as a view" in {
val c: Channel[Int] = Channel.rendezvous
val sum = new AtomicInteger()

supervised {
fork {
c.send(1)
c.send(2)
c.send(3)
c.done()
}

val s2 = c.tapAsView(v => sum.addAndGet(v).discard)
s2.receive() shouldBe 1
s2.receive() shouldBe 2
s2.receive() shouldBe 3
s2.receiveOrClosed() shouldBe ChannelClosed.Done
sum.get() shouldBe 6
}
}

it should "propagate exceptions to the calling select" in {
val c: Channel[Int] = Channel.rendezvous

Expand Down
10 changes: 10 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration.DurationInt

class SourceOpsTest extends AnyFlatSpec with Matchers with Eventually {

it should "timeout" in {
supervised {
val c = Source.timeout(100.millis)
Expand Down Expand Up @@ -65,4 +67,12 @@ class SourceOpsTest extends AnyFlatSpec with Matchers with Eventually {
s.toList shouldBe List("a", "b", "c", "d", "e", "f", "g", "h", "i")
}
}

it should "tap over a source" in {
supervised {
val sum = new AtomicInteger()
Source.fromValues(1, 2, 3).tap(v => sum.addAndGet(v).discard).toList shouldBe List(1, 2, 3)
sum.get() shouldBe 6
}
}
}
20 changes: 20 additions & 0 deletions doc/channels/transforming-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,23 @@ When dealing with Sources with chunks of bytes or Strings, you can leverage foll
* `decodeStringUtf8` to decode a `Source[Chunk[Byte]]` into a `Source[String]`, without handling line breaks, just processing input bytes as UTF-8 characters, even if a multi-byte character is divided into two chunks.

Such operations may be useful when dealing with I/O like files, `InputStream`, etc.. See [examples here](io.md).

## Logging

Ox does not have any integrations with logging libraries, but it provides a simple way to log elements flowing through channels
using the `.tap` (eagerly evaluated) or `.tapAsView` (lazily evaluated) methods.

```scala mdoc:compile-only
import ox.supervised
import ox.channels.Source

supervised {
Source.fromValues(1, 2, 3)
.tap(n => println(s"Received: $n")) // prints as soon as the element is sent from the source
.toList

Source.fromValues(1, 2, 3)
.tapAsView(n => println(s"Received: $n")) // prints when the element is consumed by `toList`
.toList
}
```
Loading