Skip to content

Commit

Permalink
Merge pull request Kinto#752 from Kinto/quotas-plugin
Browse files Browse the repository at this point in the history
Quotas plugin — Fixed Kinto#173
  • Loading branch information
Natim authored Aug 17, 2016
2 parents 7d65522 + fccd929 commit 094d1ac
Show file tree
Hide file tree
Showing 8 changed files with 1,383 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ This document describes changes between each past release.
- Added ability to plug custom StatsD backend implementations via a new ``kinto.statsd_backend``
setting. Useful for Datadog™ integration for example (fixes #626).
- Added a ``delete-collection`` action to the ``kinto`` command. (#727)
- Added verbosity options to the ``kinto`` command (#745)
- Added verbosity options to the ``kinto`` command. (#745)
- Added a built-in plugin that allows to define quotas per bucket or collection. (#752)

**Bug fixes**

Expand Down
1 change: 1 addition & 0 deletions docs/api/1.x/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Full detailed API documentation:
pagination
selecting_fields
history
quotas
utilities
batch
timestamps
Expand Down
97 changes: 97 additions & 0 deletions docs/api/1.x/quotas.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
.. _api-quotas:

Quotas management
#################

When the built-in plugin ``kinto.plugins.quotas`` is enabled in
configuration, it becomes possible to configure quotas for a bucket or
a collection.

Clients can check for the ``quotas`` capability in the
:ref:`root URL endpoint <api-utilities>`.

.. note::

In terms of performance, enabling this plugin generates two or
three additional queries on backends per request.

* A bucket's quota is a limit on the size of bucket attributes, group
attributes, collection attributes, and record attributes.
* Deleted objects are considered to have a size zero so if you add something
and remove it, it will look like it was never created for the
quota even if its tombstone is still there.
* The quota plugin only works with the transactional storage backends
(e.g. PostgreSQL)
* The quota plugin should be activated before adding some data in a
bucket or collection. If activated after, the size of the data
already present will be added to the quota limit even if this data
is deleted later.


Configuration
=============

You can configure three types of quotas:

* **QUOTA_BYTES**: The maximum total amount (in bytes) of data that
can be stored in a bucket or collection, as measured by the JSON
stringification of every value plus every key's length.
* **QUOTA_BYTES_PER_ITEM**: The maximum size (in bytes) of each
individual item in the bucket or collection, as measured by the JSON
stringification of its value plus its key length.
* **MAX_ITEMS**: The maximum number of objects that can be stored in
a collection or bucket.

You can configure it in the INI settings file.

For buckets:

* Globally for every buckets using ``kinto.quotas.bucket_max_bytes``,
``kinto.quotas.bucket_max_bytes_per_item`` and
``kinto.quotas.bucket_max_items``
* Specifically for some buckets using
``kinto.quotas.bucket_{bucket_id}_max_bytes``,
``kinto.quotas.bucket_{bucket_id}_max_bytes_per_item`` and
``kinto.quotas.bucket_{bucket_id}_max_items`` e.g.
``kinto.quotas.bucket_blocklists_max_items``

For collections:

* Globally for every bucket collections using ``kinto.quotas.collection_max_bytes``,
``kinto.quotas.collection_max_bytes_per_item`` and
``kinto.quotas.collection_max_items``
* Specifically for every collection in a given bucket using
``kinto.quotas.collection_{bucket_id}_max_bytes``,
``kinto.quotas.collection_{bucket_id}_max_bytes_per_item`` and
``kinto.quotas.collection_{bucket_id}_max_items`` e.g.
``kinto.quotas.collection_blocklists_max_items``
* Specifically for a given bucket collection using
``kinto.quotas.collection_{bucket_id}_{collection_id}_max_bytes``,
``kinto.quotas.collection_{bucket_id}_{collection_id}_max_bytes_per_item`` and
``kinto.quotas.collection_{bucket_id}_{collection_id}_max_items`` e.g.
``kinto.quotas.collection_blocklists_certificates_max_items``


How does it work?
=================

If the quota is exceeded the server will return a ``507 Insufficient
Storage`` HTTP error.

**Example Response**

.. sourcecode:: http

HTTP/1.1 507 Insufficient Storage
Access-Control-Expose-Headers: Backoff,Retry-After,Alert,Content-Length
Content-Length: 132
Content-Type: application/json; charset=UTF-8
Date: Fri, 12 Aug 2016 10:14:29 GMT
Server: waitress

{
"code": 507,
"errno": 121,
"error": "Insufficient Storage",
"message": "Collection maximum number of objects exceeded (2 > 1 objects)"
}
15 changes: 15 additions & 0 deletions kinto/plugins/quotas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from kinto.core.events import ResourceChanged

from .listener import on_resource_changed


def includeme(config):
config.add_api_capability('quotas',
description='Quotas Management on buckets '
'and collections.',
url='https://kinto.readthedocs.io')

# Listen to every resources (except history)
config.add_subscriber(on_resource_changed, ResourceChanged,
for_resources=('bucket', 'group',
'collection', 'record'))
237 changes: 237 additions & 0 deletions kinto/plugins/quotas/listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
import copy

from pyramid.httpexceptions import HTTPInsufficientStorage
from kinto.core.errors import http_error, ERRORS
from kinto.core.storage.exceptions import RecordNotFoundError
from kinto.core.utils import instance_uri

from .utils import record_size

QUOTA_RESOURCE_NAME = 'quota'
QUOTA_BUCKET_ID = 'bucket_info'
QUOTA_COLLECTION_ID = 'collection_info'


def get_bucket_settings(settings, bucket_id, name):
return settings.get(
# Bucket specific
'quotas.bucket_{}_{}'.format(bucket_id, name),
# Global to all buckets
settings.get('quotas.bucket_{}'.format(name), None))


def get_collection_settings(settings, bucket_id, collection_id, name):
return settings.get(
# Specific for a given bucket collection
'quotas.collection_{}_{}_{}'.format(bucket_id, collection_id, name),
# Specific to given bucket collections
settings.get('quotas.collection_{}_{}'.format(bucket_id, name),
# Global to all buckets collections
settings.get('quotas.collection_{}'.format(name), None)))


def on_resource_changed(event):
"""
Everytime an object is created/changed/deleted, we update the
bucket counters.
If a new object exceeds the quotas, we reject the request.
"""
payload = event.payload
action = payload['action']
resource_name = payload['resource_name']
event_uri = payload['uri']

settings = event.request.registry.settings

bucket_id = payload['bucket_id']
bucket_uri = instance_uri(event.request, 'bucket', id=bucket_id)
collection_id = None
collection_uri = None
if 'collection_id' in payload:
collection_id = payload['collection_id']
collection_uri = instance_uri(event.request,
'collection',
bucket_id=bucket_id,
id=collection_id)

bucket_max_bytes = get_bucket_settings(settings, bucket_id, 'max_bytes')
bucket_max_items = get_bucket_settings(settings, bucket_id, 'max_items')
bucket_max_bytes_per_item = get_bucket_settings(settings, bucket_id,
'max_bytes_per_item')
collection_max_bytes = get_collection_settings(settings, bucket_id,
collection_id, 'max_bytes')
collection_max_items = get_collection_settings(settings, bucket_id,
collection_id, 'max_items')
collection_max_bytes_per_item = get_collection_settings(
settings, bucket_id, collection_id, 'max_bytes_per_item')

max_bytes_per_item = (collection_max_bytes_per_item or
bucket_max_bytes_per_item)

storage = event.request.registry.storage

if action == 'delete' and resource_name == 'bucket':
try:
storage.delete(parent_id=bucket_uri,
collection_id=QUOTA_RESOURCE_NAME,
object_id=QUOTA_BUCKET_ID)
except RecordNotFoundError:
pass

collection_pattern = instance_uri(event.request, 'collection',
bucket_id=bucket_id, id='*')
storage.delete_all(parent_id=collection_pattern,
collection_id=QUOTA_RESOURCE_NAME)
return

targets = []
for impacted in event.impacted_records:
target = impacted['new' if action != 'delete' else 'old']
# On POST .../records, the URI does not contain the newly created
# record id.
obj_id = target['id']
parts = event_uri.split('/')
if resource_name in parts[-1]:
parts.append(obj_id)
else:
# Make sure the id is correct on grouped events.
parts[-1] = obj_id
uri = '/'.join(parts)

old = impacted.get('old', {})
new = impacted.get('new', {})

targets.append((uri, obj_id, old, new))

try:
bucket_info = copy.deepcopy(
storage.get(parent_id=bucket_uri,
collection_id=QUOTA_RESOURCE_NAME,
object_id=QUOTA_BUCKET_ID))
except RecordNotFoundError:
bucket_info = {
"collection_count": 0,
"record_count": 0,
"storage_size": 0,
}

collection_info = {
"record_count": 0,
"storage_size": 0,
}
if collection_id:
try:
collection_info = copy.deepcopy(
storage.get(parent_id=collection_uri,
collection_id=QUOTA_RESOURCE_NAME,
object_id=QUOTA_COLLECTION_ID))
except RecordNotFoundError:
pass

# Update the bucket quotas values for each impacted record.
for (uri, obj_id, old, new) in targets:
old_size = record_size(old)
new_size = record_size(new)

if max_bytes_per_item is not None and action != "delete":
if new_size > max_bytes_per_item:
message = ("Maximum bytes per object exceeded "
"(%d > %d Bytes." % (new_size, max_bytes_per_item))
raise http_error(HTTPInsufficientStorage(),
errno=ERRORS.FORBIDDEN.value,
message=message)

if action == 'create':
bucket_info['storage_size'] += new_size
if resource_name == 'collection':
bucket_info['collection_count'] += 1
collection_info['storage_size'] += new_size
if resource_name == 'record':
bucket_info['record_count'] += 1
collection_info['record_count'] += 1
collection_info['storage_size'] += new_size
elif action == 'update':
bucket_info['storage_size'] -= old_size
bucket_info['storage_size'] += new_size
if resource_name in ('collection', 'record'):
collection_info['storage_size'] -= old_size
collection_info['storage_size'] += new_size
elif action == 'delete':
bucket_info['storage_size'] -= old_size
if resource_name == 'collection':
collection_uri = uri
bucket_info['collection_count'] -= 1
# When we delete the collection all the records in it
# are deleted without notification.
collection_records, _ = storage.get_all(
collection_id='record',
parent_id=collection_uri)
for r in collection_records:
old_record_size = record_size(r)
bucket_info['record_count'] -= 1
bucket_info['storage_size'] -= old_record_size
collection_info['record_count'] -= 1
collection_info['storage_size'] -= old_record_size
collection_info['storage_size'] -= old_size

if resource_name == 'record':
bucket_info['record_count'] -= 1
collection_info['record_count'] -= 1
collection_info['storage_size'] -= old_size

if bucket_max_bytes is not None:
if bucket_info['storage_size'] > bucket_max_bytes:
message = ("Bucket maximum total size exceeded "
"(%d > %d Bytes). " % (bucket_info['storage_size'],
bucket_max_bytes))
raise http_error(HTTPInsufficientStorage(),
errno=ERRORS.FORBIDDEN.value,
message=message)

if bucket_max_items is not None:
if bucket_info['record_count'] > bucket_max_items:
message = ("Bucket maximum number of objects exceeded "
"(%d > %d objects)." % (bucket_info['record_count'],
bucket_max_items))
raise http_error(HTTPInsufficientStorage(),
errno=ERRORS.FORBIDDEN.value,
message=message)

if collection_max_bytes is not None:
if collection_info['storage_size'] > collection_max_bytes:
message = ("Collection maximum size exceeded "
"(%d > %d Bytes)." % (collection_info['storage_size'],
collection_max_bytes))
raise http_error(HTTPInsufficientStorage(),
errno=ERRORS.FORBIDDEN.value,
message=message)

if collection_max_items is not None:
if collection_info['record_count'] > collection_max_items:
message = ("Collection maximum number of objects exceeded "
"(%d > %d objects)." % (collection_info['record_count'],
collection_max_items))
raise http_error(HTTPInsufficientStorage(),
errno=ERRORS.FORBIDDEN.value,
message=message)

storage.update(parent_id=bucket_uri,
collection_id=QUOTA_RESOURCE_NAME,
object_id=QUOTA_BUCKET_ID,
record=bucket_info)

if collection_id:
if action == 'delete' and resource_name == 'collection':
try:
storage.delete(parent_id=collection_uri,
collection_id=QUOTA_RESOURCE_NAME,
object_id=QUOTA_COLLECTION_ID)
except RecordNotFoundError:
pass
return
else:
storage.update(parent_id=collection_uri,
collection_id=QUOTA_RESOURCE_NAME,
object_id=QUOTA_COLLECTION_ID,
record=collection_info)
Loading

0 comments on commit 094d1ac

Please sign in to comment.