Skip to content

Commit

Permalink
Allow to use bucket storage policy
Browse files Browse the repository at this point in the history
  • Loading branch information
AymericDu committed Jun 17, 2020
1 parent 5820e7b commit e4ee404
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 22 deletions.
5 changes: 5 additions & 0 deletions conf/default.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ sds_default_account = myaccount
# backend.
oio_storage_policies=SINGLE,EC123,EC64,THREECOPIES,FOURCOPIES

# Enable or disable the use of bucket storage policy.
# When enabled, and if a storage policy is defined for a bucket (root container),
# all objects in this bucket will be uploaded with this storage policy.
# use_bucket_storage_policy = false

# The configuration to chose the policy when unspecified. All the storage
# policies mentioned here must have been listed in the `oio_storage_policies`
# configuration directive. The string must respect the subsequent format:
Expand Down
15 changes: 9 additions & 6 deletions oioswift/proxy/controllers/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,15 @@ def POST(self, req):
clear_info_cache(self.app, req.environ,
self.account_name, self.container_name)

memcache = getattr(self.app, 'memcache', None) or \
req.environ.get('swift.cache')
if memcache is not None:
key = "/".join(("versioning", self.account_name,
self.container_name))
memcache.delete(key)
if req.environ.get('oio.cache') is None:
memcache = getattr(self.app, 'memcache', None) or \
req.environ.get('swift.cache')
if memcache is not None:
for memcache_prefix in ("versioning", "storage_policy"):
memcache_key = "/".join(
(memcache_prefix, self.account_name,
self.container_name))
memcache.delete(memcache_key)

resp = self.get_container_post_resp(req, headers)
return resp
Expand Down
82 changes: 66 additions & 16 deletions oioswift/proxy/controllers/obj.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,20 +213,68 @@ def enforce_versioning(self, req):

# We can't use _get_info_from_caches as it would use local worker cache
# first and an update of versioning mode may not be detected.
memcache = getattr(self.app, 'memcache', None) or \
req.environ.get('swift.cache')
if memcache is None:
oio_cache = req.environ.get('oio.cache')
memcache = None
if oio_cache is None:
memcache = getattr(self.app, 'memcache', None) or \
req.environ.get('swift.cache')

if memcache is not None:
memcache_key = "/".join(
("versioning", self.account_name, root_container))
version_policy = memcache.get(memcache_key)
if version_policy is not None:
if version_policy:
req.headers[FORCEVERSIONING_HEADER] = version_policy
return

oio_headers = {REQID_HEADER: self.trans_id}
perfdata = req.environ.get('oio.perfdata')
try:
meta = self.app.storage.container_get_properties(
self.account_name, root_container, headers=oio_headers,
cache=oio_cache, perfdata=perfdata)
except exceptions.NoSuchContainer:
raise HTTPNotFound(request=req)

version_policy = meta['system'].get('sys.m2.policy.version')
if memcache is not None:
memcache.set(memcache_key, version_policy or '')
if version_policy:
req.headers[FORCEVERSIONING_HEADER] = version_policy

def use_bucket_storage_policy(self, req):
"""
Enforce the storage policy mode of a container just before executing
an object operation. This is useful when the current object is not
stored in the "main" container but in a shard,
where the storage policy mode may not have been set yet.
"""
if not self.app.use_bucket_storage_policy:
return None

root_container = req.headers.get(BUCKET_NAME_HEADER)
if root_container is None:
return None
if root_container.endswith(MULTIUPLOAD_SUFFIX):
root_container = root_container[:-len(MULTIUPLOAD_SUFFIX)]

key = "/".join(("versioning", self.account_name, root_container))
val = memcache.get(key)
if val is not None:
if val != '':
req.headers[FORCEVERSIONING_HEADER] = val
return
# We can't use _get_info_from_caches as it would use local worker cache
# first and an update of storage policy mode may not be detected.
oio_cache = req.environ.get('oio.cache')
memcache = None
if oio_cache is None:
memcache = getattr(self.app, 'memcache', None) or \
req.environ.get('swift.cache')

if memcache is not None:
memcache_key = "/".join(
("storage_policy", self.account_name, root_container))
storage_policy = memcache.get(memcache_key)
if storage_policy is not None:
return storage_policy or None

oio_headers = {REQID_HEADER: self.trans_id}
oio_cache = req.environ.get('oio.cache')
perfdata = req.environ.get('oio.perfdata')
try:
meta = self.app.storage.container_get_properties(
Expand All @@ -235,10 +283,10 @@ def enforce_versioning(self, req):
except exceptions.NoSuchContainer:
raise HTTPNotFound(request=req)

val = meta['system'].get('sys.m2.policy.version', '')
memcache.set(key, val)
if val:
req.headers[FORCEVERSIONING_HEADER] = val
storage_policy = meta['system'].get('sys.m2.policy.storage')
if memcache is not None:
memcache.set(memcache_key, storage_policy or '')
return storage_policy

def get_object_head_resp(self, req):
storage = self.app.storage
Expand Down Expand Up @@ -647,15 +695,15 @@ def _object_create(self, account, container, **kwargs):
def _store_object(self, req, data_source, headers):
content_type = req.headers.get('content-type', 'octet/stream')
policy = None
container_info = self.container_info(self.account_name,
self.container_name, req)
if 'X-Oio-Storage-Policy' in req.headers:
policy = req.headers.get('X-Oio-Storage-Policy')
if not self.app.POLICIES.get_by_name(policy):
raise HTTPBadRequest(
"invalid policy '%s', must be in %s" %
(policy, self.app.POLICIES.by_name.keys()))
else:
container_info = self.container_info(self.account_name,
self.container_name, req)
try:
policy_index = int(
req.headers.get('X-Backend-Storage-Policy-Index',
Expand All @@ -665,6 +713,8 @@ def _store_object(self, req, data_source, headers):
if policy_index != 0:
policy = self.app.POLICIES.get_by_index(policy_index).name
else:
policy = self.use_bucket_storage_policy(req)
if policy is None:
content_length = int(req.headers.get('content-length', 0))
policy = self._get_auto_policy_from_size(content_length)

Expand Down
3 changes: 3 additions & 0 deletions oioswift/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ def __init__(self, conf, memcache=None, logger=None, account_ring=None,
for k, v in conf.iteritems()
if k.startswith("sds_")}

self.use_bucket_storage_policy = config_true_value(
conf.get('use_bucket_storage_policy', 'false'))

self.oio_stgpol = []
if 'auto_storage_policies' in conf:
for elem in conf['auto_storage_policies'].split(','):
Expand Down

0 comments on commit e4ee404

Please sign in to comment.