From 964cd5f27e852cd7700ce1445ae6eaeff627de82 Mon Sep 17 00:00:00 2001 From: Tim Date: Thu, 17 Aug 2023 10:17:27 +1200 Subject: [PATCH] don't drop queue elements when takeBetween is interrupted --- .changeset/tame-ligers-learn.md | 5 +++++ src/internal/queue.ts | 13 ++++++++----- test/Queue.ts | 20 ++++++++++++++++++++ 3 files changed, 33 insertions(+), 5 deletions(-) create mode 100644 .changeset/tame-ligers-learn.md diff --git a/.changeset/tame-ligers-learn.md b/.changeset/tame-ligers-learn.md new file mode 100644 index 00000000..a47fc528 --- /dev/null +++ b/.changeset/tame-ligers-learn.md @@ -0,0 +1,5 @@ +--- +"@effect/io": patch +--- + +don't drop queue elements when takeBetween is interrupted diff --git a/src/internal/queue.ts b/src/internal/queue.ts index 3da3992b..50e364d8 100644 --- a/src/internal/queue.ts +++ b/src/internal/queue.ts @@ -279,7 +279,7 @@ class QueueImpl implements Queue.Queue { /** @internal */ const takeRemainderLoop = ( - self: Queue.Dequeue, + self: Queue.Queue, min: number, max: number, acc: Chunk.Chunk @@ -291,10 +291,12 @@ const takeRemainderLoop = ( takeUpTo(self, max), core.flatMap((bs) => { const remaining = min - bs.length + acc = Chunk.appendAll(acc, bs) if (remaining === 1) { return pipe( take(self), - core.map((b) => pipe(acc, Chunk.appendAll(bs), Chunk.append(b))) + core.map((b) => Chunk.append(acc, b)), + core.onInterrupt(() => offerAll(self, acc)) ) } if (remaining > 1) { @@ -305,12 +307,13 @@ const takeRemainderLoop = ( self, remaining - 1, max - bs.length - 1, - pipe(acc, Chunk.appendAll(bs), Chunk.append(b)) + Chunk.append(acc, b) ) - ) + ), + core.onInterrupt(() => offerAll(self, acc)) ) } - return core.succeed(pipe(acc, Chunk.appendAll(bs))) + return core.succeed(acc) }) ) } diff --git a/test/Queue.ts b/test/Queue.ts index 737854a0..74d999ce 100644 --- a/test/Queue.ts +++ b/test/Queue.ts @@ -618,6 +618,26 @@ describe.concurrent("Queue", () => { yield* $(Fiber.interrupt(fiber)) assert.deepStrictEqual(Array.from(result), values) })) + it.effect("takeBetween doesn't drop elements if interrupted", () => + Effect.gen(function*($) { + const queue = yield* $(Queue.unbounded()) + yield* $(Queue.offer(queue, 1)) + yield* $(Queue.offer(queue, 2)) + yield* $(Queue.offer(queue, 3)) + + let fiber = yield* $(Queue.takeBetween(queue, 4, 4), Effect.fork) + yield* $(Effect.yieldNow()) + yield* $(Fiber.interrupt(fiber)) + let size = yield* $(Queue.size(queue)) + assert.deepStrictEqual(size, 3) + + // test for when remaining > 1 + fiber = yield* $(Queue.takeBetween(queue, 5, 5), Effect.fork) + yield* $(Effect.yieldNow()) + yield* $(Fiber.interrupt(fiber)) + size = yield* $(Queue.size(queue)) + assert.deepStrictEqual(size, 3) + })) it.effect("takeN returns immediately if there is enough elements", () => Effect.gen(function*($) { const queue = yield* $(Queue.bounded(100))