-
Notifications
You must be signed in to change notification settings - Fork 2
/
async_queue.test.py
52 lines (41 loc) · 1.09 KB
/
async_queue.test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from microtest import test, run, expect
from async_queue import AsyncQueue
async def collect_items(queue):
items = []
async for item in queue:
items.append(item)
# Break in test so as to not consume from the queue forever.
if (queue.is_empty()):
return items
@test
async def it_should_put_and_consume_items_in_the_queue():
queue = AsyncQueue(10)
queue.put('hello 1')
queue.put(1)
queue.put({'a': 1})
queue.put(('tuple', 'test'))
items = await collect_items(queue)
expect(items).to_be(['hello 1', 1, {'a': 1}, ('tuple', 'test')])
@test
async def it_should_discard_overflowed_items():
queue = AsyncQueue(4)
queue.put(1)
queue.put(2)
queue.put(3)
queue.put(4)
queue.put(5)
queue.put(6)
items = await collect_items(queue)
# The oldest items, 1, 2 were discarded.
expect(items).to_be([3, 4, 5, 6])
expect(queue.discard_count).to_be(2)
@test
async def it_should_enforce_a_min_capacity_of_3():
queue = AsyncQueue(1)
queue.put(1)
queue.put(2)
queue.put(3)
queue.put(4)
items = await collect_items(queue)
expect(items).to_be([2, 3, 4])
run()