-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwee.py
255 lines (198 loc) · 7.52 KB
/
wee.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
from webob import Request, Response
from webob import exc
import functools
import inspect
import re
import sys
import venusian
class Verb(object):
def __init__(self, url):
self.url = url
def __call__(self, func):
func._wee_verb = self
venusian.attach(func, self.add_handler)
return func
def add_handler(self, scanner, name, func):
if scanner.registry is None:
raise ValueError("No Scanner")
handlers = scanner.registry[self.__class__.__name__.upper()]
key = func.__module__, re.compile(func._wee_verb.url),
if handlers.has_key(key):
raise ValueError("Whoah there! You can't have the same regex for more than one "\
"function in a single module.")
handlers[key]=func
class rest(Verb):
"""
A special verb that takes a class factory rather than a function
"""
_get_items = set(('put', 'getitem', 'delete'))
verbs = ('put', 'post', 'get', 'delete', 'getitem')
def make_exp(self, name, class_):
exp = self.url
if self.url.endswith('$'):
return exp
if name not in self._get_items:
if not self.url.endswith('/'):
exp = r'%s/?$' %self.url
if not exp.endswith('$'):
exp = r'%s$' %exp
return exp
if not self.url.endswith('/'):
exp = "%s/" %self.url
exp = r'%s(?P<%s>[^/]+)/?$' %(exp, class_.subtype)
#print exp
return exp
def __call__(self, class_):
venusian.attach(class_, self.add_handler)
return class_
def add_handler(self, scanner, name, class_):
for method in self.verbs:
for superclass in class_.mro():
call = getattr(superclass, method, None)
if not call or call.__module__ == 'wee':
continue
handlers = scanner.registry.get(call.__name__.upper())
if call.__name__ is 'getitem':
handlers = scanner.registry.get('GET')
exp = self.make_exp(call.__name__, class_)
key = class_, re.compile(exp),
handlers[key] = call
break
class Resource(object):
"""
Base class for rest handling
"""
subtype = "item_id"
def __init__(self, request, **group):
for key, val in group.items():
setattr(self, key, val)
def get(self):
raise NotImplementedError
def post(self):
raise NotImplementedError
def put(self):
raise NotImplementedError
def delete(self):
raise NotImplementedError
class get(Verb):
""" the get """
class post(Verb):
""" the post """
class delete(Verb):
""" the delete """
class put(Verb):
""" the put """
class DispatchRegistry(dict):
verb = dict(POST=post,
GET=get,
PUT=put,
DELETE=delete)
def __init__(self):
for verb in self.verb:
self[verb]=dict()
def search(self, regex, request):
"""
Template method
"""
return regex.search(request.environ['PATH_INFO'])
def dispatch(self, request):
verb = self.get(request.method)
for key in verb:
source, regex, = key
match = self.search(regex, request)
handler = verb[key]
if match is not None:
group = match.groupdict()
if callable(source):
instance = source(request)
args = inspect.getargspec(handler).args
new_args = [group.get(arg) for arg in args if group.get(arg)]
return handler(instance, *new_args)
return handler(request, **group)
def __call__(self, request):
return self.dispatch(request)
class PrefixRegistry(DispatchRegistry):
def __init__(self, prefix, strict=True):
super(PrefixRegistry, self).__init__()
self.prefix = prefix
self.strict = strict
if isinstance(prefix, basestring):
self.prefix = [x for x in prefix.split('/') if x]
def search(self, regex, request):
pi = request.environ['PATH_INFO']
if not pi.startswith('/'):
pi = '/' + pi
return regex.search(pi)
def __call__(self, request):
for element in self.prefix:
path_sub = request.path_info_pop()
if self.strict is True and path_sub != element:
raise ValueError("path element: %s does not match prefix element: %s"\
%(path_sub, element))
return self.dispatch(request)
def handle_request(environ, start_response, dispatch=None, module=None,
request_class=Request, response_class=Response):
"""
The main handler. Dispatches to the user's code.
"""
request = request_class(environ)
try:
response = dispatch(request)
if response is None:
raise exc.HTTPNotFound()
except exc.WSGIHTTPException, e:
return e(environ, start_response)
except Exception, e:
#return exc.HTTPServerError('Server Error')
raise
if isinstance(response, basestring):
response = response_class(response)
return response(environ, start_response)
def make_app(module=None, registry=None, walk=False):
"""
Module name may be specified, otherwhise we stack jump and use the
one where this function is called.
If which_r is set to 'wz', wee will use the werkzeug request and
response objects
"""
if module is None:
module = sys._getframe(1).f_globals['__name__']
registry = scan_module(module, registry, walk=walk)
return functools.partial(handle_request, module=module, request_class=Request, response_class=Response, dispatch=registry)
def scan_module(module_name, registry=None, walk=False):
name = []
if registry is None:
registry = DispatchRegistry()
module_obj = module_name
if not inspect.ismodule(module_name):
mods = module_name.split('.')
if len(mods) > 1:
name = mods[:-1]
module_obj = __import__(module_name, globals(), locals(), name, -1)
scan = WeeScanner(registry=registry, walk=walk).scan
scan(module_obj)
return registry
class WeeScanner(venusian.Scanner):
def invoke(self, name, ob):
callback_map = getattr(ob, venusian.ATTACH_ATTR, None)
if callback_map is not None:
for gen in ((callback(self, name, ob) for callback in callbacks) \
for key, callbacks in callback_map.items()):
for res in gen:
pass
def scan(self, package):
""" Scan a Python package and any of its subpackages. All
top-level objects will be considered; those marked with
venusian callback attributes will be processed.
The ``package`` argument should be a reference to a Python
package or module object.
"""
for name, ob in inspect.getmembers(package):
self.invoke(name, ob)
if hasattr(package, '__path__') and getattr(self, 'walk', True) is True: # package, not module
results = venusian.walk_packages(package.__path__, package.__name__+'.')
for importer, modname, ispkg in results:
__import__(modname)
module = sys.modules[modname]
for name, ob in inspect.getmembers(module, None):
self.invoke(name, ob)