Asynchronous programming became much popular in the last few years in the Python community. Libraries like aiohttp
show incredible usage growing. They handle a large amount of concurrent connection while still maintain good code readability and simplicity. Not a long time ago, Django committed on adding async support in a next major version. So future of asynchronous python is pretty bright as you may realise. However, for a large number of developers, who came from a standard blocking model, the working mechanism of these tools may seem confusing. So in this short guide, I tried to go behind the scene and clarify the process, by re-building a little aiohttp
clone from scratch. We will start just with basic sample from official documentation and progressively add all necessary functionality that we all like. So let's start.
I assume that you already have a basic understanding of asyncio to follow this guide, but if you need a refresher here are few articles that may help
For impatients, final source code available at hzlmn/sketch
Asyncio has come a long journey to become what it looks like now. Back in those days, it was created as a lower-level tool called a "tulip", and writing higher-level applications was not as enjoyable as it is today.
Right now for most usecases asyncio
is pretty high-level API, but it also provides set of low-level helpers for library authors to manage event loops, and implement networking/ipc protocols.
Out of the box it only supports TCP
, UDP
, SSL
and subprocesses. Libraries implement their own higher level (HTTP, FTP, etc.) based on base transports and available APIs.
All communications done over chaining Transport
and Protocols
. In simple words Transport
describes how we can exchange data and Protocol
is responsible for choosing which data specifically.
Asyncio
has a pretty great official docs so you can read more about it here
To get a first grasp let's write simple TCP
server that will echo messages.
server.py
import asyncio
class Server(asyncio.Protocol):
def connection_made(self, transport):
self._transport = transport
def data_received(self, data):
message = data.decode()
self._transport.write(data)
self._transport.close()
loop = asyncio.get_event_loop()
coro = loop.create_server(Server, '127.0.0.1', 8080)
server = loop.run_until_complete(coro)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
$ curl http://127.0.0.1:8080
GET / HTTP/1.1
Host: 127.0.0.1:8080
User-Agent: curl/7.54.0
Accept: */*
As you can see from the example above, the code is pretty simple, but as you may realize it is not scalable for writing a high-level application yet.
As HTTP
works over TCP
transport we already can send HTTP
requests to our server, however, we receive them in raw format and working with it will be annoying as you may guess. So next step we need to add better HTTP
handling mechanism.
Let's add request parsing so we can extract some useful info like headers, body, path and work with them instead of raw text. Parsing is a complex topic and it is certainly out of the scope of this guide, thats why we will use httptools by MagicStack for this as it rapidly fast, standard compatible and pretty flexible.
aiohttp
, on the other hand, has own hand-written Python based parser as well as binding to Node's http-parser
.
Lets write our parsing class, that will be used as mixin for our main Server
class.
http_parser.py
class HttpParserMixin:
def on_body(self, data):
self._body = data
def on_url(self, url):
self._url = url
def on_message_complete(self):
print(f"Received request to {self._url.decode(self._encoding)}")
def on_header(self, header, value):
header = header.decode(self._encoding)
self._headers[header] = value.decode(self._encoding)
Now when we have working HttpParserMixin
, lets modify a bit our Server
and apply mixin.
server.py
import asyncio
from httptools import HttpRequestParser
from .http_parser import HttpParserMixin
class Server(asyncio.Protocol, HttpParserMixin):
def __init__(self, loop):
self._loop = loop
self._encoding = "utf-8"
self._url = None
self._headers = {}
self._body = None
self._transport = None
self._request_parser = HttpRequestParser(self)
def connection_made(self, transport):
self._transport = transport
def connection_lost(self, *args):
self._transport = None
def data_received(self, data):
# Pass data to our parser
self._request_parser.feed_data(data)
So far, we have our server that can understand incoming HTTP
requests and obtain some important information from it. Now let's try to add simple runner to it.
server.py
if __name__ == "__main__":
loop = asyncio.get_event_loop()
serv = Server(loop)
server = loop.run_until_complete(loop.create_server(lambda: serv, port=8080))
try:
print("Started server on ::8080")
loop.run_until_complete(server.serve_forever())
except KeyboardInterrupt:
server.close()
loop.run_until_complete(server.wait_closed())
loop.stop()
> python server.py
Started server on ::8080
> curl http://127.0.0.1:8080/hello
At this moment we have working server that can parse HTTP
calls, but for our apps we need better abstractions to work with.
Let's create base Request
class that will group together all incoming HTTP
request information. We will use yarl
library for dealing with urls, make sure you installed it with pip.
request.py
import json
from yarl import URL
class Request:
_encoding = "utf_8"
def __init__(self, method, url, headers, version=None, body=None, app=None):
self._version = version
self._method = method.decode(self._encoding)
self._url = URL(url.decode(self._encoding))
self._headers = headers
self._body = body
@property
def method(self):
return self._method
@property
def url(self):
return self._url
@property
def headers(self):
return self._headers
def text(self):
if self._body is not None:
return self._body.decode(self._encoding)
def json(self):
text = self.text()
if text is not None:
return json.loads(text)
def __repr__(self):
return f"<Request at 0x{id(self)}>"
As a next step, we also need a structure, that will helps us to describe outgoing HTTP
response in programmer-friendly manner and convert it to raw HTTP
, which can be processed by asyncio.Transport
.
response.py
import http.server
web_responses = http.server.BaseHTTPRequestHandler.responses
class Response:
_encoding = "utf-8"
def __init__(
self,
body=None,
status=200,
content_type="text/plain",
headers=None,
version="1.1",
):
self._version = version
self._status = status
self._body = body
self._content_type = content_type
if headers is None:
headers = {}
self._headers = headers
@property
def body(self):
return self._body
@property
def status(self):
return self._status
@property
def content_type(self):
return self._content_type
@property
def headers(self):
return self._headers
def add_body(self, data):
self._body = data
def add_header(self, key, value):
self._headers[key] = value
def __str__(self):
"""We will use this in our handlers, it is actually generation of raw HTTP response,
that will be passed to our TCP transport
"""
status_msg, _ = web_responses.get(self._status)
messages = [
f"HTTP/{self._version} {self._status} {status_msg}",
f"Content-Type: {self._content_type}",
f"Content-Length: {len(self._body)}",
]
if self.headers:
for header, value in self.headers.items():
messages.append(f"{header}: {value}")
if self._body is not None:
messages.append("\r\n" + self._body)
return "\r\n".join(messages)
def __repr__(self):
return f"<Response at 0x{id(self)}>"
As you can see code is pretty straight forward we incapsulate all our data and provide proper getters. Also we have few helpers for reading body text
and json
, that will be used later. We also need to update our Server
to actually construct Request
object from message.
It should be created, when a whole request processed, so we need to add it to on_message_complete
event handler in our parser mixin.
http_parser.py
class HttpParserMixin:
...
def on_message_complete(self):
self._request = self._request_class(
version=self._request_parser.get_http_version(),
method=self._request_parser.get_method(),
url=self._url,
headers=self._headers,
body=self._body,
)
...
Server also need a little modification to create Response
object and
pass encoded value to asyncio.Transport
.
server.py
from .response import Response
...
class Server(asyncio.Protocol, HttpParserMixin):
...
def __init__(self, loop):
...
self._request = None
self._request_class = Request
...
def data_received(self, data):
self._request_parser.feed_data(data)
resp = Response(body=f"Received request on {self._request.url}")
self._transport.write(str(resp).encode(self._encoding))
self._transport.close()
Now running our server.py
we will be able to see Received request on /path
in response to curl call http:localhost:8080/path
.
At this stage we already have simple working server that can process HTTP requests and Request/Response objects for dealing with request cycles. However, our hand-crafted toolkit still miss few important concepts. First of all right now we have only one main request handler, in large applications we have lots of them for different routes so we certainly need mechanism for registring multiple route handlers.
So, let's try to build simplest possible UrlDispatcher
, just object with internal dict, that store as a key method and path tuple and actual handler as a value. We also need a handler for situation where user try to reach unrecognized route.
router.py
from .response import Response
class UrlDispatcher:
def __init__(self):
self._routes = {}
async def _not_found(self, request):
return Response(f"Not found {request.url} on this server", status=404)
def add_route(self, method, path, handler):
self._routes[(method, path)] = handler
def resolve(self, request):
key = (request.method, request.url.path)
if key not in self._routes:
return self._not_found
return self._routes[key]
Sure thing, we miss lots of stuff like parameterized routes but we will add them later on. For now let keep it simple as it is.
Next things we need an Application
container, that will actually combine together all app related information, because dealing with underlaying Server
will be annoying for us.
import asyncio
from .router import UrlDispatcher
from .server import Server
from .response import Response
class Application:
def __init__(self, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._router = UrlDispatcher()
@property
def loop(self):
return self._loop
@property
def router(self):
return self._router
def _make_server(self):
return Server(loop=self._loop, handler=self._handler, app=self)
async def _handler(self, request, response_writer):
"""Process incoming request"""
handler = self._router.resolve(request)
resp = await handler(request)
if not isinstance(resp, Response):
raise RuntimeError(f"expect Response instance but got {type(resp)}")
response_writer(resp)
We need to modify our Server
a bit and add response_writer
method, that will be responsible for passing data to transport. Also initializer should be changed to add handler
and app
properties that will be used to call corresponding handlers.
server.py
class Server(asyncio.Protocol, HttpParserMixin):
...
def __init__(self, loop, handler, app):
self._loop = loop
self._url = None
self._headers = {}
self._body = None
self._transport = None
self._request_parser = HttpRequestParser(self)
self._request = None
self._request_class = Request
self._request_handler = handler
self._request_handler_task = None
def response_writer(self, response):
self._transport.write(str(response).encode(self._encoding))
self._transport.close()
...
http_parser.py
class HttpParserMixin:
def on_body(self, data):
self._body = data
def on_url(self, url):
self._url = url
def on_message_complete(self):
self._request = self._request_class(
version=self._request_parser.get_http_version(),
method=self._request_parser.get_method(),
url=self._url,
headers=self._headers,
body=self._body,
)
self._request_handler_task = self._loop.create_task(
self._request_handler(self._request, self.response_writer)
)
def on_header(self, header, value):
header = header.decode(self._encoding)
self._headers[header] = value.decode(self._encoding)
Finally, when we have basic functionality ready, can register new routes and handlers, let's add simple helper for actually running our app instance (similar to web.run_app
in aiohttp
).
application.py
def run_app(app, host="127.0.0.1", port=8080, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
serv = app._make_server()
server = loop.run_until_complete(
loop.create_server(lambda: serv, host=host, port=port)
)
try:
print(f"Started server on {host}:{port}")
loop.run_until_complete(server.serve_forever())
except KeyboardInterrupt:
server.close()
loop.run_until_complete(server.wait_closed())
loop.stop()
And now, time to make simple app with our fresh toolkit.
app.py
import asyncio
from .response import Response
from .application import Application, run_app
app = Application()
async def handler(request):
return Response(f"Hello at {request.url}")
app.router.add_route("GET", "/", handler)
if __name__ == "__main__":
run_app(app)
If you will run it and then make GET
request to /
, you will be able to see Hello at /
and 404
response for all other routes. Hooray, we done it however there are still big room for improvements.
$ curl 127.0.0.1:8080/
Hello at /
$ curl 127.0.0.1:8080/invalid
Not found /invalid on this server
So far, we have all basic functionality up & running, but we still need to change certain things in our "framework". First of all, as we discussed earlier our router are missing parametrized-routes, it is "must have" feature of all modern libraries. Next we need to add support for middlewares, it is also very common and powerfull concept. Great thing about aiohttp
that i am pretty much in love with is application lifecycle hooks (eg. on_startup
, on_shutdown
, on_cleanup
) so we certainly should try to implement it as well.
Currently, our UrlDispatcher
is pretty lean and it works with registered url pathes as a string. First thing, that we need is actually add support for patterns like /user/{username}
to our resolve
method. Also we need _format_pattern
helper that will be responsible for generating actual regular expression from parametrized string. Also as you may noted we have another helper _method_not_allowed
and methods for simpler definition of GET
, POST
, etc. routes.
router.py
import re
from functools import partialmethod
from .response import Response
class UrlDispatcher:
_param_regex = r"{(?P<param>\w+)}"
def __init__(self):
self._routes = {}
async def _not_found(self, request):
return Response(f"Could not find {request.url.raw_path}")
async def _method_not_allowed(self, request):
return Response(f"{request.method} not allowed for {request.url.raw_path}")
def resolve(self, request):
for (method, pattern), handler in self._routes.items():
match = re.match(pattern, request.url.raw_path)
if match is None:
return None, self._not_found
if method != request.method:
return None, self._method_not_allowed
return match.groupdict(), handler
def _format_pattern(self, path):
if not re.search(self._param_regex, path):
return path
regex = r""
last_pos = 0
for match in re.finditer(self._param_regex, path):
regex += path[last_pos: match.start()]
param = match.group("param")
regex += r"(?P<%s>\w+)" % param
last_pos = match.end()
return regex
def add_route(self, method, path, handler):
pattern = self._format_pattern(path)
self._routes[(method, pattern)] = handler
add_get = partialmethod(add_route, "GET")
add_post = partialmethod(add_route, "POST")
add_put = partialmethod(add_route, "PUT")
add_head = partialmethod(add_route, "HEAD")
add_options = partialmethod(add_route, "OPTIONS")
We also need to modify our application container, right now resolve
method of UrlDispatcher
returns match_info
and handler
. So inside Application._handler
change the following lines.
application.py
class Application:
...
async def _handler(self, request, response_writer):
"""Process incoming request"""
match_info, handler = self._router.resolve(request)
request.match_info = match_info
...
For those, who aren't familiar with a this concept, in simple words middleware
is just a coroutine, that can modify incoming request object or change response of a handler. It will be fired before each request to the server. Implementation is pretty trivial for our needs. First of all we need to add list of registered middlewares inside our Application
object and change a little bit Application._handler
to run through them. Each middleware should work with result of previous one in chain.
application.py
from functools import partial
...
class Application:
def __init__(self, loop=None, middlewares=None):
...
if middlewares is None:
self._middlewares = []
...
async def _handler(self, request, response_writer):
"""Process incoming request"""
match_info, handler = self._router.resolve(request)
request.match_info = match_info
if self._middlewares:
for md in self._middlewares:
handler = partial(md, handler=handler)
resp = await handler(request)
...
Now lets try to add request logging middleware to our simple application.
app.py
import asyncio
from .response import Response
from .application import Application, run_app
async def log_middleware(request, handler):
print(f"Received request to {request.url.raw_path}")
return await handler(request)
app = Application(middlewares=[log_middleware])
async def handler(request):
return Response(f"Hello at {request.url}")
app.router.add_route("GET", "/", handler)
if __name__ == "__main__":
run_app(app)
If we try to run it, we should see Received request to /
message in response to incoming request.
Next step let's add support for running certain coroutines in response to events like starting server and stopping it. It is pretty neat feature of aiohttp
. There are many signals like on_startup
, on_shutdown
, on_response_prepared
to name a few, but for our need let's keep it simple as possible and just implement startup
& shutdown
helpers.
Inside Application
we need to add list of actual handlers for each event with proper encapsulation and provide getters. Then actual startup
and shutdown
coroutines and add corresponding calls to run_app
helper.
application.py
class Application:
def __init__(self, loop=None, middlewares=None):
...
self._on_startup = []
self._on_shutdown = []
...
@property
def on_startup(self):
return self._on_startup
@property
def on_shutdown(self):
return self._on_shutdown
async def startup(self):
coros = [func(self) for func in self._on_startup]
await asyncio.gather(*coros, loop=self._loop)
async def shutdown(self):
coros = [func(self) for func in self._on_shutdown]
await asyncio.gather(*coros, loop=self._loop)
...
def run_app(app, host="127.0.0.1", port=8080, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
serv = app._make_server()
loop.run_until_complete(app.startup())
server = loop.run_until_complete(
loop.create_server(lambda: serv, host=host, port=port)
)
try:
print(f"Started server on {host}:{port}")
loop.run_until_complete(server.serve_forever())
except KeyboardInterrupt:
loop.run_until_complete(app.shutdown())
server.close()
loop.run_until_complete(server.wait_closed())
loop.stop()
At this steps we have most of the core features added, however, we still have lack of exceptions handling. Great feature about aiohttp
is that it allows you to work with web exceptions as a native python exceptions.
It's done with implementing both Exception
and Response
classes and it's really flexible mechanism we would like to have as well.
So, first thing lets create our base HTTPException
class and few helpers based on it that we might need like HTTPNotFound
for unrecognized pathes HTTPBadRequest
for user side issues and HTTPFound
for redirecting.
from .response import Response
class HTTPException(Response, Exception):
status_code = None
def __init__(self, reason=None, content_type=None):
self._reason = reason
self._content_type = content_type
Response.__init__(
self,
body=self._reason,
status=self.status_code,
content_type=self._content_type or "text/plain",
)
Exception.__init__(self, self._reason)
class HTTPNotFound(HTTPException):
status_code = 404
class HTTPBadRequest(HTTPException):
status_code = 400
class HTTPFound(HTTPException):
status_code = 302
def __init__(self, location, reason=None, content_type=None):
super().__init__(reason=reason, content_type=content_type)
self.add_header("Location", location)
Then we need modify a bit our Application._handler
to actually catch web exceptions.
application.py
class Application:
...
async def _handler(self, request, response_writer):
"""Process incoming request"""
try:
match_info, handler = self._router.resolve(request)
request.match_info = match_info
if self._middlewares:
for md in self._middlewares:
handler = partial(md, handler=handler)
resp = await handler(request)
except HTTPException as exc:
resp = exc
...
Also now we can drop _not_found
& _method_not_allowed
helpers from our UrlDispatcher
and instead just raise proper exceptions.
router.py
class UrlDispatcher:
...
def resolve(self, request):
for (method, pattern), handler in self._routes.items():
match = re.match(pattern, request.url.raw_path)
if match is None:
raise HTTPNotFound(reason=f"Could not find {request.url.raw_path}")
if method != request.method:
raise HTTPBadRequest(reason=f"{request.method} not allowed for {request.url.raw_path}")
return match.groupdict(), handler
...
Another thing that might be a good addition is a standard formatted response for internal server errors, because we don't want to break actual app in some inconsistent situations. Let's add just a simple html template as well as tiny helper for formatting exceptions.
helpers.py
import traceback
from .response import Response
server_exception_templ = """
<div>
<h1>500 Internal server error</h1>
<span>Server got itself in trouble : <b>{exc}</b><span>
<p>{traceback}</p>
</div>
"""
def format_exception(exc):
resp = Response(status=500, content_type="text/html")
trace = traceback.format_exc().replace("\n", "</br>")
msg = server_exception_templ.format(exc=str(exc), traceback=trace)
resp.add_body(msg)
return resp
As simple as it is, and now just catch all Exception
inside our Application._handler
and generate actual html response with our helper.
application.py
class Application:
...
async def _handler(self, request, response_writer):
"""Process incoming request"""
try:
match_info, handler = self._router.resolve(request)
request.match_info = match_info
if self._middlewares:
for md in self._middlewares:
handler = partial(md, handler=handler)
resp = await handler(request)
except HTTPException as exc:
resp = exc
except Exception as exc:
resp = format_exception(exc)
...
As a final touch, we need to add signal processing for the proper process of shutting down our application. So, let's change run_app
to the following lines.
application.py
...
def run_app(app, host="127.0.0.1", port=8080, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
serv = app._make_server()
loop.run_until_complete(app.startup())
server = loop.run_until_complete(
loop.create_server(lambda: serv, host=host, port=port)
)
loop.add_signal_handler(
signal.SIGTERM, lambda: asyncio.ensure_future(app.shutdown())
)
...
Now when we have our toolkit ready, let's try to complete our previous sample application lifecycle hooks and exceptions that we just added.
app.py
from .application import Application, run_app
async def on_startup(app):
# you may query here actual db, but for an example let's just use simple set.
app.db = {"john_doe",}
async def log_middleware(request, handler):
print(f"Received request to {request.url.raw_path}")
return await handler(request)
async def handler(request):
username = request.match_info["username"]
if username not in request.app.db:
raise HTTPNotFound(reason=f"No such user with as {username} :(")
return Response(f"Welcome, {username}!")
app = Application(middlewares=[log_middleware])
app.on_startup.append(on_startup)
app.router.add_get("/{username}", handler)
if __name__ == "__main__":
run_app(app)
If we done all properly you will see log messages on each request, welcome message in response to registered user and HTTPNotFound
for unregistered users and unrecognized path.
Summing it up, in ~500 lines we hand-crafted pretty simple yet powerfull micro framework inspired by aiohttp
& sanic
. Of course, it is not production ready software as it still miss lot's of usefull & important features like more robust server, better HTTP support to fully correlate with specification, web sockets to name a few. However, i belive that through this process we developed better understanding how such tools built. As a famous physicist Richard Feynman said “What I cannot create, I do not understand”. So I hope you enjoyed this guide, see ya! 👋