Skip to content

Commit

Permalink
Allow to use bucket storage policy
Browse files Browse the repository at this point in the history
  • Loading branch information
AymericDu committed Jun 17, 2020
1 parent 5820e7b commit f58dca4
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 18 deletions.
5 changes: 5 additions & 0 deletions conf/default.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ sds_default_account = myaccount
# backend.
oio_storage_policies=SINGLE,EC123,EC64,THREECOPIES,FOURCOPIES

# Enable or disable the use of bucket storage policy.
# When enabled, and if a storage policy is defined for a bucket (root container),
# all objects in this bucket will be uploaded with this storage policy.
# use_bucket_storage_policy = false

# The configuration to chose the policy when unspecified. All the storage
# policies mentioned here must have been listed in the `oio_storage_policies`
# configuration directive. The string must respect the subsequent format:
Expand Down
87 changes: 69 additions & 18 deletions oioswift/proxy/controllers/obj.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,34 +211,83 @@ def enforce_versioning(self, req):
root_container.endswith(MULTIUPLOAD_SUFFIX)):
return None

# We can't use _get_info_from_caches as it would use local worker cache
# first and an update of versioning mode may not be detected.
memcache = getattr(self.app, 'memcache', None) or \
req.environ.get('swift.cache')
if memcache is None:
oio_cache = req.environ.get('oio.cache')
memcache = None
if oio_cache is None:
# We can't use _get_info_from_caches as it would use local worker
# cache first and an update of versioning mode may not be detected.
memcache = getattr(self.app, 'memcache', None) or \
req.environ.get('swift.cache')

if memcache is not None:
key = "/".join(("versioning", self.account_name, root_container))
value = memcache.get(key)
if value is not None:
if value:
req.headers[FORCEVERSIONING_HEADER] = value
return

oio_headers = {REQID_HEADER: self.trans_id}
perfdata = req.environ.get('oio.perfdata')
try:
meta = self.app.storage.container_get_properties(
self.account_name, root_container, headers=oio_headers,
cache=oio_cache, perfdata=perfdata)
except exceptions.NoSuchContainer:
raise HTTPNotFound(request=req)

value = meta['system'].get('sys.m2.policy.version')
if memcache is not None:
memcache.set(key, value or '')

if value:
req.headers[FORCEVERSIONING_HEADER] = value

def use_bucket_storage_policy(self, req):
"""
Enforce the storage policy mode of a container just before executing
an object operation. This is useful when the current object is not
stored in the "main" container but in a shard,
where the storage policy mode may not have been set yet.
"""
if not self.app.use_bucket_storage_policy:
return None

key = "/".join(("versioning", self.account_name, root_container))
val = memcache.get(key)
if val is not None:
if val != '':
req.headers[FORCEVERSIONING_HEADER] = val
return
root_container = req.headers.get(BUCKET_NAME_HEADER)
if root_container is None:
return None
if root_container.endswith(MULTIUPLOAD_SUFFIX):
root_container = root_container[:-len(MULTIUPLOAD_SUFFIX)]

oio_headers = {REQID_HEADER: self.trans_id}
oio_cache = req.environ.get('oio.cache')
memcache = None
if oio_cache is None:
# We can't use _get_info_from_caches as it would use local worker
# cache first and an update of storage policy mode may not be
# detected.
memcache = getattr(self.app, 'memcache', None) or \
req.environ.get('swift.cache')

if memcache is not None:
key = "/".join(("policy", self.account_name, root_container))
value = memcache.get(key)
if value is not None:
return value or None

oio_headers = {REQID_HEADER: self.trans_id}
perfdata = req.environ.get('oio.perfdata')
try:
meta = self.app.storage.container_get_properties(
self.account_name, root_container, headers=oio_headers,
cache=oio_cache, perfdata=perfdata)
except exceptions.NoSuchContainer:
raise HTTPNotFound(request=req)
value = meta['system'].get('sys.m2.policy.storage')

if memcache is not None:
memcache.set(key, value or '')

val = meta['system'].get('sys.m2.policy.version', '')
memcache.set(key, val)
if val:
req.headers[FORCEVERSIONING_HEADER] = val
return value

def get_object_head_resp(self, req):
storage = self.app.storage
Expand Down Expand Up @@ -647,15 +696,15 @@ def _object_create(self, account, container, **kwargs):
def _store_object(self, req, data_source, headers):
content_type = req.headers.get('content-type', 'octet/stream')
policy = None
container_info = self.container_info(self.account_name,
self.container_name, req)
if 'X-Oio-Storage-Policy' in req.headers:
policy = req.headers.get('X-Oio-Storage-Policy')
if not self.app.POLICIES.get_by_name(policy):
raise HTTPBadRequest(
"invalid policy '%s', must be in %s" %
(policy, self.app.POLICIES.by_name.keys()))
else:
container_info = self.container_info(self.account_name,
self.container_name, req)
try:
policy_index = int(
req.headers.get('X-Backend-Storage-Policy-Index',
Expand All @@ -665,6 +714,8 @@ def _store_object(self, req, data_source, headers):
if policy_index != 0:
policy = self.app.POLICIES.get_by_index(policy_index).name
else:
policy = self.use_bucket_storage_policy(req)
if policy is None:
content_length = int(req.headers.get('content-length', 0))
policy = self._get_auto_policy_from_size(content_length)

Expand Down
3 changes: 3 additions & 0 deletions oioswift/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ def __init__(self, conf, memcache=None, logger=None, account_ring=None,
for k, v in conf.iteritems()
if k.startswith("sds_")}

self.use_bucket_storage_policy = config_true_value(
conf.get('use_bucket_storage_policy', 'false'))

self.oio_stgpol = []
if 'auto_storage_policies' in conf:
for elem in conf['auto_storage_policies'].split(','):
Expand Down

0 comments on commit f58dca4

Please sign in to comment.