-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_all.py
146 lines (130 loc) · 5.08 KB
/
test_all.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from unittest.mock import patch, MagicMock
import pytest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
def test_sync():
import anynotify
import logging
with anynotify.init(
worker_cls=anynotify.SyncWorker,
client=anynotify.DiscordClient('https://localhost/webhook'),
integrations=[anynotify.LoggingIntegration()]) as hub:
with patch('requests.post') as mock_post:
mock_response = MagicMock()
mock_response.status_code = 204
mock_post.return_value = mock_response
logging.info('test')
logging.warning('test')
assert mock_post.call_count == 1
def test_wsig():
import anynotify
from werkzeug.test import Client
from werkzeug.wrappers import Response
wsgi_integration = anynotify.WsgiIntegration()
def application(environ, start_response):
1/0
wrapped = wsgi_integration.wrap(application)
with anynotify.init(worker_cls=anynotify.SyncWorker, client=anynotify.DiscordClient('http://localhost/webhook'), integrations=[wsgi_integration]):
with patch('requests.post') as mock_post:
mock_response = MagicMock()
mock_response.status_code = 204
mock_post.return_value = mock_response
client = Client(wrapped, Response)
with pytest.raises(ZeroDivisionError):
response = client.get('/test')
assert mock_post.call_count == 1
d = mock_post.call_args_list[0].kwargs['json']['embeds'][0]['description']
assert 'http://localhost/test' in d
assert 'GET' in d
assert 'ZeroDivisionError' in d
@pytest.mark.parametrize('kind', ['gevent', 'thread'])
def test_async(kind):
import anynotify
import logging
import time
if kind == 'gevent':
import gevent
sleep = gevent.sleep
spawn = gevent.spawn
worker_cls = anynotify.GeventWorker
elif kind == 'thread':
import threading
sleep = time.sleep
def spawn(target, *args):
t = threading.Thread(target=target, args=args)
t.start()
return t
worker_cls = anynotify.ThreadWorker
else:
raise ValueError()
with patch('requests.post') as mock_post:
mock_response = MagicMock()
mock_response.status_code = 204
mock_post.return_value = mock_response
started = time.monotonic()
with anynotify.init(
worker_cls=worker_cls,
client=anynotify.DiscordClient('http://localhost/webhook', anynotify.RateLimiter(60, 10, 0.1)),
integrations=[anynotify.LoggingIntegration()]) as hub:
logging.info('test')
logging.warning('test 1')
logging.error('test 2')
logging.critical('test 3')
try:
1/0
except:
logging.exception('test 4')
elapsed = time.monotonic() - started
assert mock_post.call_count == 4
assert elapsed > 0.1 * 3
with patch('requests.post') as mock_post:
mock_response = MagicMock()
mock_response.status_code = 204
mock_post.return_value = mock_response
started = time.monotonic()
with anynotify.init(
worker_cls=worker_cls,
client=anynotify.DiscordClient('http://localhost/webhook', anynotify.RateLimiter(60, 10, 0)),
integrations=[anynotify.LoggingIntegration()]) as hub:
logging.info('test')
def f(n):
hub.push_context(x=n)
sleep((3-n)/10)
logging.warning('test %d', n)
gs = []
for i in range(3):
gs.append(spawn(f, i))
for g in gs:
g.join()
elapsed = time.monotonic() - started
e = mock_post.call_args_list[0].kwargs['json']['embeds'][0]
assert 'test 2' in e['title'] and "'x': 2" in e['description']
e = mock_post.call_args_list[1].kwargs['json']['embeds'][0]
assert 'test 1' in e['title'] and "'x': 1" in e['description']
e = mock_post.call_args_list[2].kwargs['json']['embeds'][0]
assert 'test 0' in e['title'] and "'x': 0" in e['description']
assert mock_post.call_count == 3
def test_ratelimit():
import anynotify
rl = anynotify.RateLimiter(last_n_seconds=10, max_requests=3)
assert rl.get_wait_duration(1) is None
rl.inc(1)
assert rl.get_wait_duration(2) is None
rl.inc(2)
assert rl.get_wait_duration(4) is None
rl.inc(4)
d = rl.get_wait_duration(4)
assert d is not None
assert d == 7
assert rl.get_wait_duration(11-0.001) > 0
assert rl.get_wait_duration(11+0.001) is None
rl = anynotify.RateLimiter(last_n_seconds=10, max_requests=3, min_interval=1)
assert rl.get_wait_duration(1) is None
rl.inc(1)
assert rl.get_wait_duration(2-0.001) > 0
assert rl.get_wait_duration(2) is None
rl.inc(2)
rl = anynotify.RateLimiter(last_n_seconds=10, max_requests=3, min_interval=3)
rl.inc(1)
assert rl.get_wait_duration(1) == 3