diff --git a/.changeset/tame-ligers-learn.md b/.changeset/tame-ligers-learn.md new file mode 100644 index 00000000..a47fc528 --- /dev/null +++ b/.changeset/tame-ligers-learn.md @@ -0,0 +1,5 @@ +--- +"@effect/io": patch +--- + +don't drop queue elements when takeBetween is interrupted diff --git a/src/internal/queue.ts b/src/internal/queue.ts index 3da3992b..cb9980ad 100644 --- a/src/internal/queue.ts +++ b/src/internal/queue.ts @@ -233,9 +233,18 @@ class QueueImpl implements Queue.Queue { core.interrupt : core.deferredAwait(deferred) }), - core.onInterrupt(() => { - return core.sync(() => unsafeRemove(this.takers, deferred)) - }) + core.onInterrupt(() => + core.flatMap( + core.suspend(() => { + unsafeRemove(this.takers, deferred) + return core.deferredPoll(deferred) + }), + (effect) => + effect._tag === "Some" ? + core.flatMap(effect.value, (a) => this.offer(a)) + : core.unit + ) + ) ) } }) @@ -279,7 +288,7 @@ class QueueImpl implements Queue.Queue { /** @internal */ const takeRemainderLoop = ( - self: Queue.Dequeue, + self: Queue.Queue, min: number, max: number, acc: Chunk.Chunk @@ -291,10 +300,12 @@ const takeRemainderLoop = ( takeUpTo(self, max), core.flatMap((bs) => { const remaining = min - bs.length + acc = Chunk.appendAll(acc, bs) if (remaining === 1) { return pipe( take(self), - core.map((b) => pipe(acc, Chunk.appendAll(bs), Chunk.append(b))) + core.map((b) => Chunk.append(acc, b)), + core.onInterrupt(() => offerAll(self, acc)) ) } if (remaining > 1) { @@ -305,12 +316,13 @@ const takeRemainderLoop = ( self, remaining - 1, max - bs.length - 1, - pipe(acc, Chunk.appendAll(bs), Chunk.append(b)) + Chunk.append(acc, b) ) - ) + ), + core.onInterrupt(() => offerAll(self, acc)) ) } - return core.succeed(pipe(acc, Chunk.appendAll(bs))) + return core.succeed(acc) }) ) } diff --git a/test/Queue.ts b/test/Queue.ts index 737854a0..15328196 100644 --- a/test/Queue.ts +++ b/test/Queue.ts @@ -218,6 +218,27 @@ describe.concurrent("Queue", () => { const result = yield* $(Queue.size(queue)) assert.strictEqual(result, 0) })) + it.effect("take interruption doesn't drop elements", () => + Effect.gen(function*($) { + const queue = yield* $(Queue.bounded(100)) + const taken: Array = [] + const fiber = yield* $(Effect.fork(Effect.tap(Queue.take(queue), (n) => + Effect.sync(() => { + taken.push(n) + })))) + yield* $(Effect.yieldNow()) + yield* $(Effect.fork(Fiber.interrupt(fiber))) + yield* $(Queue.offer(queue, 1)) + yield* $(Queue.offer(queue, 2)) + yield* $(Queue.offer(queue, 3)) + let elements = yield* $(Queue.takeAll(queue)) + assert.strictEqual(taken.length, 0) + assert.deepEqual(Chunk.toReadonlyArray(elements), [2, 3]) + + yield* $(Effect.yieldNow()) + elements = yield* $(Queue.takeAll(queue)) + assert.deepEqual(Chunk.toReadonlyArray(elements), [1]) + })) it.effect("offer interruption", () => Effect.gen(function*($) { const queue = yield* $(Queue.bounded(2)) @@ -618,6 +639,26 @@ describe.concurrent("Queue", () => { yield* $(Fiber.interrupt(fiber)) assert.deepStrictEqual(Array.from(result), values) })) + it.effect("takeBetween doesn't drop elements if interrupted", () => + Effect.gen(function*($) { + const queue = yield* $(Queue.unbounded()) + yield* $(Queue.offer(queue, 1)) + yield* $(Queue.offer(queue, 2)) + yield* $(Queue.offer(queue, 3)) + + let fiber = yield* $(Queue.takeBetween(queue, 4, 4), Effect.fork) + yield* $(Effect.yieldNow()) + yield* $(Fiber.interrupt(fiber)) + let size = yield* $(Queue.size(queue)) + assert.deepStrictEqual(size, 3) + + // test for when remaining > 1 + fiber = yield* $(Queue.takeBetween(queue, 5, 5), Effect.fork) + yield* $(Effect.yieldNow()) + yield* $(Fiber.interrupt(fiber)) + size = yield* $(Queue.size(queue)) + assert.deepStrictEqual(size, 3) + })) it.effect("takeN returns immediately if there is enough elements", () => Effect.gen(function*($) { const queue = yield* $(Queue.bounded(100))