diff --git a/.changeset/tame-ligers-learn.md b/.changeset/tame-ligers-learn.md
new file mode 100644
index 00000000..a47fc528
--- /dev/null
+++ b/.changeset/tame-ligers-learn.md
@@ -0,0 +1,5 @@
+---
+"@effect/io": patch
+---
+
+don't drop queue elements when takeBetween is interrupted
diff --git a/src/internal/queue.ts b/src/internal/queue.ts
index 3da3992b..50e364d8 100644
--- a/src/internal/queue.ts
+++ b/src/internal/queue.ts
@@ -279,7 +279,7 @@ class QueueImpl implements Queue.Queue {
/** @internal */
const takeRemainderLoop = (
- self: Queue.Dequeue,
+ self: Queue.Queue,
min: number,
max: number,
acc: Chunk.Chunk
@@ -291,10 +291,12 @@ const takeRemainderLoop = (
takeUpTo(self, max),
core.flatMap((bs) => {
const remaining = min - bs.length
+ acc = Chunk.appendAll(acc, bs)
if (remaining === 1) {
return pipe(
take(self),
- core.map((b) => pipe(acc, Chunk.appendAll(bs), Chunk.append(b)))
+ core.map((b) => Chunk.append(acc, b)),
+ core.onInterrupt(() => offerAll(self, acc))
)
}
if (remaining > 1) {
@@ -305,12 +307,13 @@ const takeRemainderLoop = (
self,
remaining - 1,
max - bs.length - 1,
- pipe(acc, Chunk.appendAll(bs), Chunk.append(b))
+ Chunk.append(acc, b)
)
- )
+ ),
+ core.onInterrupt(() => offerAll(self, acc))
)
}
- return core.succeed(pipe(acc, Chunk.appendAll(bs)))
+ return core.succeed(acc)
})
)
}
diff --git a/test/Queue.ts b/test/Queue.ts
index 737854a0..74d999ce 100644
--- a/test/Queue.ts
+++ b/test/Queue.ts
@@ -618,6 +618,26 @@ describe.concurrent("Queue", () => {
yield* $(Fiber.interrupt(fiber))
assert.deepStrictEqual(Array.from(result), values)
}))
+ it.effect("takeBetween doesn't drop elements if interrupted", () =>
+ Effect.gen(function*($) {
+ const queue = yield* $(Queue.unbounded())
+ yield* $(Queue.offer(queue, 1))
+ yield* $(Queue.offer(queue, 2))
+ yield* $(Queue.offer(queue, 3))
+
+ let fiber = yield* $(Queue.takeBetween(queue, 4, 4), Effect.fork)
+ yield* $(Effect.yieldNow())
+ yield* $(Fiber.interrupt(fiber))
+ let size = yield* $(Queue.size(queue))
+ assert.deepStrictEqual(size, 3)
+
+ // test for when remaining > 1
+ fiber = yield* $(Queue.takeBetween(queue, 5, 5), Effect.fork)
+ yield* $(Effect.yieldNow())
+ yield* $(Fiber.interrupt(fiber))
+ size = yield* $(Queue.size(queue))
+ assert.deepStrictEqual(size, 3)
+ }))
it.effect("takeN returns immediately if there is enough elements", () =>
Effect.gen(function*($) {
const queue = yield* $(Queue.bounded(100))