Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Commit

Permalink
don't drop queue elements when takeBetween is interrupted
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Aug 16, 2023
1 parent fcd5f62 commit 964cd5f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 5 deletions.
5 changes: 5 additions & 0 deletions .changeset/tame-ligers-learn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/io": patch
---

don't drop queue elements when takeBetween is interrupted
13 changes: 8 additions & 5 deletions src/internal/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class QueueImpl<A> implements Queue.Queue<A> {

/** @internal */
const takeRemainderLoop = <A>(
self: Queue.Dequeue<A>,
self: Queue.Queue<A>,
min: number,
max: number,
acc: Chunk.Chunk<A>
Expand All @@ -291,10 +291,12 @@ const takeRemainderLoop = <A>(
takeUpTo(self, max),
core.flatMap((bs) => {
const remaining = min - bs.length
acc = Chunk.appendAll(acc, bs)
if (remaining === 1) {
return pipe(
take(self),
core.map((b) => pipe(acc, Chunk.appendAll(bs), Chunk.append(b)))
core.map((b) => Chunk.append(acc, b)),
core.onInterrupt(() => offerAll(self, acc))
)
}
if (remaining > 1) {
Expand All @@ -305,12 +307,13 @@ const takeRemainderLoop = <A>(
self,
remaining - 1,
max - bs.length - 1,
pipe(acc, Chunk.appendAll(bs), Chunk.append(b))
Chunk.append(acc, b)
)
)
),
core.onInterrupt(() => offerAll(self, acc))
)
}
return core.succeed(pipe(acc, Chunk.appendAll(bs)))
return core.succeed(acc)
})
)
}
Expand Down
20 changes: 20 additions & 0 deletions test/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,26 @@ describe.concurrent("Queue", () => {
yield* $(Fiber.interrupt(fiber))
assert.deepStrictEqual(Array.from(result), values)
}))
it.effect("takeBetween doesn't drop elements if interrupted", () =>
Effect.gen(function*($) {
const queue = yield* $(Queue.unbounded<number>())
yield* $(Queue.offer(queue, 1))
yield* $(Queue.offer(queue, 2))
yield* $(Queue.offer(queue, 3))

let fiber = yield* $(Queue.takeBetween(queue, 4, 4), Effect.fork)
yield* $(Effect.yieldNow())
yield* $(Fiber.interrupt(fiber))
let size = yield* $(Queue.size(queue))
assert.deepStrictEqual(size, 3)

// test for when remaining > 1
fiber = yield* $(Queue.takeBetween(queue, 5, 5), Effect.fork)
yield* $(Effect.yieldNow())
yield* $(Fiber.interrupt(fiber))
size = yield* $(Queue.size(queue))
assert.deepStrictEqual(size, 3)
}))
it.effect("takeN returns immediately if there is enough elements", () =>
Effect.gen(function*($) {
const queue = yield* $(Queue.bounded<number>(100))
Expand Down

0 comments on commit 964cd5f

Please sign in to comment.