Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iterative consumption of flows #3274

Open
lowasser opened this issue May 5, 2022 · 15 comments
Open

Iterative consumption of flows #3274

lowasser opened this issue May 5, 2022 · 15 comments

Comments

@lowasser
Copy link
Contributor

lowasser commented May 5, 2022

This is the product of some library design work we've been doing at Google, and we wanted to take this upstream in case there was interest. The Flow API is almost exclusively "push"-based, not "pull" based. The main alternative approach is to adapt it to a Channel, but that often introduces inconvenient concurrency issues (arbitrary buffering, the risk of forgetting to cancel the channel).

What we want is more or less a scoped iteration method. Here is a sketch of some examples that we think would be better with the proposed API than what Flow can do today.

suspend fun <T> Flow<T>.all(predicate: suspend (T) -> Boolean): Boolean = iterate {
  while (hasNext()) {
    if (!predicate(next()) return@iterate false
  }
  true
}

fun <T> Flow<T>.pairElements(): Flow<Pair<T, T>> = flow {
  iterate {
    while (hasNext()) {
      val a = next()
      if (!hasNext()) break
      val b = next()
      emit(Pair(a, b))
    }
  }
}

// this API sort of generalizes e.g. https://github.com/cashapp/turbine
fun testStreamingRpcResponse() = runTest {
  val responses = stub.doRpc(request)
  responses.iterate {
    assertThat(next()).isEqualTo(response1)
    doStimulus(1)
    assertThat(next()).isEqualTo(response2)
    doStimulus(2)
    assertThat(hasNext()).isFalse()
  }
}

In short, what we're proposing is

suspend fun <T, R> Flow<T>.iterate(block: ??Iterator<T>.() -> R): R

It's not precisely clear what the ??Iterator should be: we might reuse ChannelIterator, but we're not particularly fond of the requirement to call hasNext() in the test case especially, and of course this isn't actually a Channel. For ourselves, we are inclined to introduce a separate FlowIterator type.

We have an implementation that does not introduce undue concurrency. To be a little more specific, it does not continue the backing Flow.collect until the the next call to hasNext, etc.; it's a linear transformation in the same way as map and filter but not buffer or conflate. We can PR it if there's interest.

@lowasser
Copy link
Contributor Author

Continuing conversation from #3278 (comment).

We don't mind if it's slow

I've mentioned this in other contexts, but perhaps not when you were present, so let me say this as the coroutines lead on Google's internal Kotlin team: we consider performance to be our lowest priority for coroutines, behind correctness/ease of writing correct code, clear API design, and testability. We think this API has advantages for all three of those.

We're not using flows (or coroutines) for the performance, we're using them because they're a clear and clean API. The 30x slowdown you estimate is a little on the high side for us, but only a little; we would be excited to use this API even if it represented a 30x slowdown, even in production, even if it's only to make code easier to follow. Coroutines just aren't a bottleneck, or even within 30x of being a bottleneck; outages and bugs are much more expensive.

Test use cases

The canonical example of the test case looks something like this. Here's a very simple test:

// with Truth, other assertion libraries will presumably be similar
assertThat(stub.rpc(flowOf(1,2,3)).toList()).containsExactly(2, 4, 5, 6).inOrder()

This is great. But what if we want to assert that the RPC doesn't wait until the requests flow completes, but emits responses as we go?

val requests = SharedFlow()
stub.rpc(requests.take(3)).iterate {
  // this assumes that hasNext is not required as with ChannelIterator.
  requests.emit(1)
  assertThat(next()).isEqualTo(2)
  requests.emit(2)
  assertThat(next()).isEqualTo(4)
  assertThat(next()).isEqualTo(5)
  requests.emit(3)
  assertThat(next()).isEqualTo(6)
  assertThat(hasNext()).isFalse()
}

This makes different assertions at each request, which is a poor match for push-based APIs. I can imagine several push-based approaches to this problem, but they all separate stimulus and response, making the test harder to write, read, and debug. Most streaming RPCs need tests like this, and we have a lot of those.

https://github.com/cashapp/turbine is a preexisting library by @JakeWharton that addressed the specific use case of testing in this style; its README also discusses use cases in more detail. We've been hesitant to fully adopt Turbine because it dallies with UNDISPATCHED, which we've seen lead to impossible-to-debug issues, but its API is great for testing flows -- but a more general operator applicable to production would be even better. I'm sure Jake could comment more, but he did thumbs-up the proposal above.

Flow operators and production use cases

For the production use case, pairElements from the original comment is a simplified version of our real use case, but it gets across the idea that we want to express. The pull-based style looks like

fun Flow<T>.pairElements() = flow {
  var even = true
  lateinit var prev: T
  collect {
    if (even) {
      prev = it
    } else {
      emit(Pair(prev, it))
    }
    even = !even
  }
}

This is a very simple state machine, but it's much clearer to write

fun Flow<T>.pairElements() = flow {
  iterate {
    while (hasNext()) {
      emit(Pair(next(), next()))
    }
  }
}

Coroutines are all about capturing state machines from program control flow, and they do a great job here.

The use case being simplified is parsing streams of bytes from a Flow<ByteString> in the delimited proto format, in which the byte stream contains alternating varint-encoded message lengths, then messages of that length. These can be split arbitrarily across ByteStrings, which are simply chunks of unknown length read from files (in the cloud, via RPCs, thus the Flow). In this case, the algorithm is "consume byte strings until we have a length, parse it, consume byte strings until we have at least that length, emit that chunk, repeat until the stream ends." We started out with a push-based style to implement that, sure, but it was complex and hard to follow, because it had to manually manage a state machine that was much clearer in the pull style. Parsing the messages and juggling the RPC is much more expensive than the coroutine parts, so who cares if we slow down the flow consumption?

Tests for this use case also ran the hard way into the race condition in channel-based solutions, parsing byte streams interrupted by exceptions.

Debugging

The desirable state when debugging is that, at a breakpoint at B(X), A is suspended at the point where it just emitted X and has not done any more work.

It looks like it has to be solved not on the operator level, but at the infrastructure/testing one. E.g. (optional) injectable dispatchers to the production source code and sequential testing primitives: kotlinx-coroutines-test or turbine.

A clever dispatcher doesn't solve this problem, though! #3278 (comment) shows the control flow diagrams that are the root of the issue, which is that two separate coroutines are both eligible to run at the same point in the program, as far as any dispatcher is concerned, but we don't want one of them to run (yet) when debugging -- and the dispatcher API means that it can't possibly know which one we want. A fully sequential, single-threaded dispatcher prevents "races" in the sense of JVM races, but doesn't solve the "race" in the sense that the dispatcher chooses which of two eligible coroutines get resumed first, and debugging requires the dispatcher to make a specific choice there that a dispatcher can't guess.

The fix is to prevent the race, where the only coroutine eligible to run is the right one to step forward, and that means something more like iterate that doesn't use channels (in the obvious way).

@lowasser
Copy link
Contributor Author

(To clarify, in the "parsing streams of bytes" use case, we are returning a Flow of the parsed messages.)

@qwwdfsad
Copy link
Collaborator

Hi, I'm still processing your input and trying to internalize all the things in order to further weigh the pros and cons of the solution. Let's discuss individual details in order to be on the same page.

We don't mind if it's slow
...
we would be excited to use this API even if it represented a 30x slowdown, even in production, even if it's only to make code easier to follow

It's good to hear that, meaning that we have a bit more space in our design. But it's important to remember that we have plenty of other consumers, including, but not limited to, Android developers, server-side developers, high-performance web-frameworks, high-throughput backend apps and IDEs. Having two similar APIs that, at the first glance, differs only in code-style preferences, but one being 30 times slower, will bring irreversible damage to the ecosystem in the long term, especially when both pull and push based APIs are equally appealing.

suspend fun <T> Flow<T>.all(predicate: suspend (T) -> Boolean): Boolean = iterate {
  while (hasNext()) {
    if (!predicate(next()) return@iterate false
  }
  true
}

This is an example of what can bring potential harm. One can implement it via collectWhile using a pull-based model, and one can implement it using iterate, but being an order of magnitude slower. While being acceptable in some cases, it eventually will separate the code style, will bring the debates over code-style and a lot of articles "Do and dont's for performance" (and the veil of mystery around the mythical abstract "performance" won't make it any easier for us).

#3278

Would it be okay for you to semi-finish the PR? It is a decent first peek at the API shape, but unfortunately, it doesn't compile and doesn't have any tests. It would be really nice to see API and tests that you already have written to give us a taste of how you are leveraging it. There are some examples, but they have their own pitfalls. Real-world use-cases are always better :)

This is a very simple state machine, but it's much clearer to write

fun Flow<T>.pairElements() = flow {
  iterate {
    while (hasNext()) {
      emit(Pair(next(), next()))
    }
  }
}

This particular example looks really concise, works and passes basic testing. As long as the original flow emits an even number of elements, otherwise, it throws NoSuchElementException.
Of course, channels and collection's iterators have the same pitfall, but their use pattern is completely different -- their iterators are almost never expected to be used directly, but rather with for-loop convention, consume extension or stdlib primitives. Here we explicitly delegate this responsibility to end-users, potentially creating the space for hard-to-spot bugs that type system cannot prevent.

A fully sequential, single-threaded dispatcher prevents "races" in the sense of JVM races, but doesn't solve the "race" in the sense that the dispatcher chooses which of two eligible coroutines get resumed first, and debugging requires the dispatcher to make a specific choice there that a dispatcher can't guess.

Thanks for the control flow diagram, it really made the picture clear. I've immediately started thinking about more intrusive UNDISPATCHED primitive for this purpose and noticed your comment

We've been hesitant to fully adopt Turbine because it dallies with UNDISPATCHED, which we've seen lead to impossible-to-debug issues

Could you please elaborate on that and show a few examples of such issues?

I'm looking forward to seeing more examples (I suppose the parsing is the most interesting and hard-to-emulate one) to see if there are ways to improve the API and get around potential drawbacks

@qwwdfsad qwwdfsad self-assigned this May 20, 2022
@JakeWharton
Copy link
Contributor

Do you mean Unconfined? Turbine's use of UNDISPATCHED merely starts the upstream flow collection synchronously.

@lowasser
Copy link
Contributor Author

I don't have specific issues I can refer to about Unconfined or UNDISPATCHED, though both make us extremely nervous. Certainly in Java land we've had lots of issues caused by directExecutor. We specifically haven't seen any such issues with Turbine, but then, we have it on a tight allowlist just because we don't trust that there aren't issues with Unconfined and UNDISPATCHED too subtle for us to notice. Similarly, we would be unhappy about more intrusive primitives and their edge cases.

We care about performance, to be clear, but we think race conditions in common patterns that are subtle enough that they're not obvious to you or Roman at first glance are pretty damaging to the ecosystem themselves.

I've committed a working version with some of our tests to the PR, though it doesn't have as many tests as I'd like yet. I've also modified it to avoid the use of channels, instead just passing continuations back and forth. I'm less confident in its correctness, though.

(Possibly an advantage of the API in the current PR: it can't actually be for-eached over, which makes it more likely to be used in iterator-specific ways?)

@JakeWharton
Copy link
Contributor

Turbines use of unconfined is an optimization to avoid double dispatch similar to one of your goals here. It isn't strictly required, and runTest seems to have broken its behavior so probably worth removing.

@jingibus
Copy link

jingibus commented May 25, 2022

It's not just Turbine-type interactions that have need to "pull" consumption. We also need it for interaction with fakes.

Internally at Cash, we've been attacking this problem by using a "pull" consumption tool that Kotlin already has: Channel.

We facade Channel's API with one better suited to our test requirements. Here's a simplified sketch of that API:

class AwaitChannel<T> {
  private val channel = Channel<T>(capacity = UNLIMITED)

  fun add(value: T) { channel.trySend(value) }

  val isEmpty: Boolean get() { return channel.isEmpty }

  /**
   * Wait for a value on this channel for 1s, throwing a [CancellationException] if none arrives.
   */
  suspend fun <T> awaitValue() = try {
    withTimeout(1000L) {
      channel.receive()
    }
  }

  /**
   * Return a value, or null if empty. 
   *
   * To be used from legacy, non-suspendful call sites. Will throw
   * when called from a non-suspending context.
   */
  fun <T> takeValue(): T? {
    assertCallingContextIsNotSuspended()
    return channel.tryReceive().getOrNull()
  }

This is essentially the same thing Turbine provides, but in a form that works in our Fakes.

I think the "it's just a channel" idea can work in Turbine, too. I did some homework on this last week, and haven't yet found a major roadblock.

@lowasser
Copy link
Contributor Author

Does that approach address the race issue discussed in the PR here?

@jingibus
Copy link

We have only been concerned with races in test. Test has firm expectations of consistency: many immaterial changes in order may occur in multithreaded prod without harm, but immaterial changes in ordering under test will result in flaky test failures.

Since races are introduced by threading, not coroutines, we police dispatchers aggressively:

  • All dispatchers are injected as EmptyCoroutineContext in test
  • All Rx schedulers are injected as Schedulers.trampoline() in test
  • We stay on guard against problematic Kotlin APIs that silently introduce Dispatchers.Default (e.g. rx(Single|Maybe|Completable|Observable), dispatcherless CoroutineScope())

@lowasser
Copy link
Contributor Author

There are plenty of races that can happen as a result of coroutines and not threading! #3278 (comment) is an example of code that nondeterministically throws or succeeds, even run on a single-threaded dispatcher.

@jingibus
Copy link

jingibus commented Jul 7, 2022

#3278 (comment) is an example of code that nondeterministically throws or succeeds, even run on a single-threaded dispatcher.

I've stewed on this claim for a few weeks, and reviewing the discussion I think that it doesn't really hit at anything I really care about. And that is because I truly care about actual nondeterminism: identical code that yields different results on different invocations.

The following paragraph I think makes the different issues pretty clear:

A clever dispatcher doesn't solve this problem, though! #3278 (comment) shows the control flow diagrams that are the root of the issue, which is that two separate coroutines are both eligible to run at the same point in the program, as far as any dispatcher is concerned, but we don't want one of them to run (yet) when debugging -- and the dispatcher API means that it can't possibly know which one we want. A fully sequential, single-threaded dispatcher prevents "races" in the sense of JVM races, but doesn't solve the "race" in the sense that the dispatcher chooses which of two eligible coroutines get resumed first, and debugging requires the dispatcher to make a specific choice there that a dispatcher can't guess.

So we have two issues: JVM races, and dispatcher races.

I agree that these are both problems: as a programmer writing a test, I care quite a bit about dispatcher races. Dispatcher races make it difficult to understand what the outcome of my test code will be, and that makes test writing very finicky.

But: that is a different order of problem from a JVM race. As someone who tends to get pinged about test issues in my organization, I don't generally get out of bed for a dispatcher race, because it is a problem localized to that engineer.

If the engineer is experiencing a JVM race, on the other hand, that is a major issue. JVM races can expose every single engineer in the team to flaky CI tests. That's when I zoom in and apply the three rules given above to root cause the problem.

So: our language is letting us down here. If I say "race", I may be referring to a dispatcher race (which is a conceptual race), or a JVM race (which is an actual race). I admit to using "race" myself interchangeably for these two very ideas. It's definitely a cause of discord: anytime you say "race condition," engineers run screaming.

And they should! But maybe sometimes faster than others.

@lowasser
Copy link
Contributor Author

lowasser commented Jul 7, 2022

I agree that these are two different sorts of problem, but I'm not actually convinced of your relative ordering of them. Here are some counterpoints I'd like to make:

  • JVM races can be caught by instrumentation such as TSAN, but dispatcher races cannot.
  • Dispatcher races can go the other way whenever you attempt to upgrade coroutines -- which is constantly necessary, given the rate of bugfixes. This can result in large numbers of subtle test breakages that coroutines maintainers in our organization need to resolve before pushing upgrades.
  • Dispatcher races in production code can manifest in the tests of any team using that code.
  • Test-driven development is especially badly effected when one can't actually predict the result of such a race.

But finally, I think it comes down to the issue that a developer should be able to predict the deterministic behavior of this code in advance. There's no good reason for it to be hard to predict. Worse, it looks predictable and isn't.

@jingibus
Copy link

jingibus commented Jul 8, 2022

Those arguments make sense to me. But even if I grant all of them, is the conclusion (dispatcher races are dangerous) actionable? Even with the proposed iterate?

E.g. consider an operation like combine:

fun <T> combine(sourceFlows: List<Flow<T>) = flow {
  val items = Channel<T>()
  coroutineScope {
    sourceFlows.forEach { 
      launch { it.collect { item -> items.send(item) } }
    }
    items.consumeEach { emit(it) }
  }
}

For this flow, the order of items emitted depends on the order in which the sourceFlows collections are dispatched. So this is a dispatcher race. And a bread and butter one for us: our async business logic frequently collates data from multiple sources.

How do we get around that? A tool like iterate doesn't actually address this race, right? iterate punctures the suspend fun collect black box, but there are boxes within boxes.

Here's my POV: there is no silver bullet to make any coroutines code conceptually "deterministic," because there's none for concurrent code in general. Programming coroutines code necessarily means programming concurrently, and thus programming in a world where dispatchers will race. This is why the whole coroutines/channels/jobs toolkit exists: to provide a toolset for connecting concurrent processes with one another that can be reasoned about, even in the presence of actual nondeterminism.

But actual nondeterminism is a huge problem under test. (I know; I've tried it, intrepid fool that I am.) We have to rely on the JVM determinism of a single threaded dispatcher to patch things over that last bit, and eat the issues with coroutines upgrades. (We're a lot smaller than you; I shudder to think of the pain you're experiencing there)

So my approach has been to say, "Coroutines tests are just coroutines code. That means they're concurrent. That means learning new techniques to govern the concurrent flow of data and execution in your tests (e.g. use channels to build fakes that you can interact with, and don't use shared state to communicate with test code). It also means abiding by a few rules about dispatchers to avoid flakiness on CI. Do this, and you'll find yourself more fluent in your test code and in prod, since you'll be using the same basic toolset in both places."

I've personally been pretty happy with the result from a TDD perspective. I'd love to see an even better alternative, of course, but until that happens this has been quite effective for us.

@lowasser
Copy link
Contributor Author

I agree that there's no way to make any coroutines code "conceptually deterministic," but I certainly claim that you can have deterministic tests for many coroutine tests, especially at the size of unit tests, and that where we can build deterministic tests it's good to do so. The code you've given pretty much reinvents flatMapMerge, which is never going to be deterministic.

I'm really thinking about the specific case of testing simple streaming RPCs. That's a small piece that can be deterministic, that we need to test all over the place -- and it's difficult to test deterministically today. I would bet that "streaming RPC tests that could be deterministically tested" are at least a single-digit percentage of all Flow tests out in the world today.

@dkhalanskyjb
Copy link
Collaborator

dkhalanskyjb commented Jul 19, 2022

It looks to me like the pull-based model can be easily circumvented even by accident, as the rest of the facilities are not prepared for such use. The example above of combine is already interesting, but here's another one.

Let's say we have a flow that attempts to emit numbers from 0 to 9, but if the consumer is too slow, it drops the old values.

    val someFlow: Flow<Int> = flow {
        repeat(10) {
            emit(it)
        }
    }.buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST)

Let's try testing it with the proposed API:

    @Test
    fun x() = runBlocking {
        val collected = mutableListOf<Int>()
        someFlow.iterate {
            while (hasNext()) {
                collected.add(next())
            }
        }
        assertEquals((0..9).toList(), collected)
    }

Doesn't work, collected only has 0 and 9.

Let's try adding a yield to someFlow so that the collection procedure has a chance to work:

    val someFlow: Flow<Int> = flow {
        repeat(10) {
            emit(it)
            yield()
        }
    }.buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST)

Suddenly, collected becomes [0, 2, 4, 6, 8, 9].

Ok, let's run the collection procedure in Dispatchers.Unconfined instead, so that emit itself tries to ensure the delivery.

        val collected = mutableListOf<Int>()
        withContext(Dispatchers.Unconfined) {
            someFlow.iterate {
                while (hasNext()) {
                    collected.add(next())
                }
            }
        }
        assertEquals((0..9).toList(), collected)

Doesn't change a thing, the results are still the same, because iterate is not waiting for values, it already buffered a value and is waiting to get a chance to process it while everything else happens.

Naturally, if someFlow suspends nondeterministically (which can emulate the dispatcher being nondeterministic in its choice of coroutines to execute), the results produced by iterate are arbitrary:

    val someFlow: Flow<Int> = flow {
        repeat(10) {
            emit(it)
            if (Random.nextInt() % 2 == 0) yield()
        }
    }.buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST)

When I ran it, I got [0, 2, 5, 7, 9].

I'd argue that this case is quite simple and practical: I've seen the logic of dropping old values used for sending cumulative statistics where the new emission supersedes the old ones, or for processing rapid clicks so that an action is only fired once if a button was pressed several times in quick succession, etc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants