-
Notifications
You must be signed in to change notification settings - Fork 357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prefetch a chunk of result for stream operation #2137
Conversation
modules/hikari/src/test/scala/doobie/postgres/PGConcurrentSuite.scala
Outdated
Show resolved
Hide resolved
|
||
// if buffer is less than result set, it will be still block new connection since the result set is not drained | ||
// use sleep to test the result set can be drained | ||
val streamSmallerBuffer = fr"select * from stream_cancel_test".query[Int].stream.transactBuffer(xa, 50) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm sorry I can't quite understand how this is testing that the connection is closed before the stream is completed 🤔
An alternative way I can think of to test this is to use some sort of atomic boolean and set it to true when the transactor finishes the transaction, and then you can assert that this was set to true before you process the last chunk of the stream.
import doobie.util.transactor.Strategy
import doobie.FC
import java.util.concurrent.atomic.AtomicBoolean
val hasClosed = new AtomicBoolean(false)
xa.copy(
strategy0 = Strategy.default.copy(
always = Strategy.default.always.flatMap(_ => FC.delay(hasClosed.set(true)))
)
)
It's probably possible to use Deferred
instead of AtomicBoolean but there will be more ceremony to instantiates a Deferred
inside ConnectionIO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed the test to assert if it's closed
dc077b6
to
267dfbe
Compare
66da6e4
to
2bd4a8d
Compare
Resolves typelevel#2132. Use fs2 Stream's `prefetchN` to buffer query results so that slow downstream operations don't slow down transaction commit. The buffered result size is `chunkSize` (equals to `fetchSize`) by default. If want a different buffer size, can use `transactNoPrefetc` to get a stream without `prefetchN` and append any prefetch operations wanted.
Hi @jatcwang just follow up on this PR. Please let me know if the updated code addressed your comments or not. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your patience :)
Resolves #2132.
Use fs2 Stream's
prefetchN
to buffer query results so that slow downstream operations don't slow down transaction commit.The buffered result size is
chunkSize
(equals tofetchSize
) by default. If want a different buffer size, can usetransactNoPrefetc
to get a stream withoutprefetchN
and append any prefetch operations wanted.