diff --git a/.changeset/tame-ligers-learn.md b/.changeset/tame-ligers-learn.md
new file mode 100644
index 000000000..a47fc528c
--- /dev/null
+++ b/.changeset/tame-ligers-learn.md
@@ -0,0 +1,5 @@
+---
+"@effect/io": patch
+---
+
+don't drop queue elements when takeBetween is interrupted
diff --git a/src/internal/queue.ts b/src/internal/queue.ts
index 3da3992b2..ecc024923 100644
--- a/src/internal/queue.ts
+++ b/src/internal/queue.ts
@@ -279,7 +279,7 @@ class QueueImpl implements Queue.Queue {
/** @internal */
const takeRemainderLoop = (
- self: Queue.Dequeue,
+ self: Queue.Queue,
min: number,
max: number,
acc: Chunk.Chunk
@@ -294,7 +294,8 @@ const takeRemainderLoop = (
if (remaining === 1) {
return pipe(
take(self),
- core.map((b) => pipe(acc, Chunk.appendAll(bs), Chunk.append(b)))
+ core.map((b) => Chunk.append(Chunk.appendAll(acc, bs), b)),
+ core.onInterrupt(() => offerAll(self, Chunk.appendAll(bs, acc)))
)
}
if (remaining > 1) {
@@ -305,12 +306,13 @@ const takeRemainderLoop = (
self,
remaining - 1,
max - bs.length - 1,
- pipe(acc, Chunk.appendAll(bs), Chunk.append(b))
+ Chunk.append(Chunk.appendAll(acc, bs), b)
)
- )
+ ),
+ core.onInterrupt(() => offerAll(self, Chunk.appendAll(bs, acc)))
)
}
- return core.succeed(pipe(acc, Chunk.appendAll(bs)))
+ return core.succeed(Chunk.appendAll(acc, bs))
})
)
}
diff --git a/test/Queue.ts b/test/Queue.ts
index 737854a0d..74d999ce9 100644
--- a/test/Queue.ts
+++ b/test/Queue.ts
@@ -618,6 +618,26 @@ describe.concurrent("Queue", () => {
yield* $(Fiber.interrupt(fiber))
assert.deepStrictEqual(Array.from(result), values)
}))
+ it.effect("takeBetween doesn't drop elements if interrupted", () =>
+ Effect.gen(function*($) {
+ const queue = yield* $(Queue.unbounded())
+ yield* $(Queue.offer(queue, 1))
+ yield* $(Queue.offer(queue, 2))
+ yield* $(Queue.offer(queue, 3))
+
+ let fiber = yield* $(Queue.takeBetween(queue, 4, 4), Effect.fork)
+ yield* $(Effect.yieldNow())
+ yield* $(Fiber.interrupt(fiber))
+ let size = yield* $(Queue.size(queue))
+ assert.deepStrictEqual(size, 3)
+
+ // test for when remaining > 1
+ fiber = yield* $(Queue.takeBetween(queue, 5, 5), Effect.fork)
+ yield* $(Effect.yieldNow())
+ yield* $(Fiber.interrupt(fiber))
+ size = yield* $(Queue.size(queue))
+ assert.deepStrictEqual(size, 3)
+ }))
it.effect("takeN returns immediately if there is enough elements", () =>
Effect.gen(function*($) {
const queue = yield* $(Queue.bounded(100))