From e4ee4044520ed190a06329637c23d274234610e4 Mon Sep 17 00:00:00 2001 From: Aymeric Ducroquetz Date: Mon, 15 Jun 2020 17:09:14 +0200 Subject: [PATCH] Allow to use bucket storage policy --- conf/default.cfg | 5 ++ oioswift/proxy/controllers/container.py | 15 +++-- oioswift/proxy/controllers/obj.py | 82 ++++++++++++++++++++----- oioswift/server.py | 3 + 4 files changed, 83 insertions(+), 22 deletions(-) diff --git a/conf/default.cfg b/conf/default.cfg index ced8d99d..8c51e02c 100644 --- a/conf/default.cfg +++ b/conf/default.cfg @@ -43,6 +43,11 @@ sds_default_account = myaccount # backend. oio_storage_policies=SINGLE,EC123,EC64,THREECOPIES,FOURCOPIES +# Enable or disable the use of bucket storage policy. +# When enabled, and if a storage policy is defined for a bucket (root container), +# all objects in this bucket will be uploaded with this storage policy. +# use_bucket_storage_policy = false + # The configuration to chose the policy when unspecified. All the storage # policies mentioned here must have been listed in the `oio_storage_policies` # configuration directive. The string must respect the subsequent format: diff --git a/oioswift/proxy/controllers/container.py b/oioswift/proxy/controllers/container.py index b0b3ca25..33814ba7 100644 --- a/oioswift/proxy/controllers/container.py +++ b/oioswift/proxy/controllers/container.py @@ -376,12 +376,15 @@ def POST(self, req): clear_info_cache(self.app, req.environ, self.account_name, self.container_name) - memcache = getattr(self.app, 'memcache', None) or \ - req.environ.get('swift.cache') - if memcache is not None: - key = "/".join(("versioning", self.account_name, - self.container_name)) - memcache.delete(key) + if req.environ.get('oio.cache') is None: + memcache = getattr(self.app, 'memcache', None) or \ + req.environ.get('swift.cache') + if memcache is not None: + for memcache_prefix in ("versioning", "storage_policy"): + memcache_key = "/".join( + (memcache_prefix, self.account_name, + self.container_name)) + memcache.delete(memcache_key) resp = self.get_container_post_resp(req, headers) return resp diff --git a/oioswift/proxy/controllers/obj.py b/oioswift/proxy/controllers/obj.py index 299afcfa..66655ccc 100644 --- a/oioswift/proxy/controllers/obj.py +++ b/oioswift/proxy/controllers/obj.py @@ -213,20 +213,68 @@ def enforce_versioning(self, req): # We can't use _get_info_from_caches as it would use local worker cache # first and an update of versioning mode may not be detected. - memcache = getattr(self.app, 'memcache', None) or \ - req.environ.get('swift.cache') - if memcache is None: + oio_cache = req.environ.get('oio.cache') + memcache = None + if oio_cache is None: + memcache = getattr(self.app, 'memcache', None) or \ + req.environ.get('swift.cache') + + if memcache is not None: + memcache_key = "/".join( + ("versioning", self.account_name, root_container)) + version_policy = memcache.get(memcache_key) + if version_policy is not None: + if version_policy: + req.headers[FORCEVERSIONING_HEADER] = version_policy + return + + oio_headers = {REQID_HEADER: self.trans_id} + perfdata = req.environ.get('oio.perfdata') + try: + meta = self.app.storage.container_get_properties( + self.account_name, root_container, headers=oio_headers, + cache=oio_cache, perfdata=perfdata) + except exceptions.NoSuchContainer: + raise HTTPNotFound(request=req) + + version_policy = meta['system'].get('sys.m2.policy.version') + if memcache is not None: + memcache.set(memcache_key, version_policy or '') + if version_policy: + req.headers[FORCEVERSIONING_HEADER] = version_policy + + def use_bucket_storage_policy(self, req): + """ + Enforce the storage policy mode of a container just before executing + an object operation. This is useful when the current object is not + stored in the "main" container but in a shard, + where the storage policy mode may not have been set yet. + """ + if not self.app.use_bucket_storage_policy: + return None + + root_container = req.headers.get(BUCKET_NAME_HEADER) + if root_container is None: return None + if root_container.endswith(MULTIUPLOAD_SUFFIX): + root_container = root_container[:-len(MULTIUPLOAD_SUFFIX)] - key = "/".join(("versioning", self.account_name, root_container)) - val = memcache.get(key) - if val is not None: - if val != '': - req.headers[FORCEVERSIONING_HEADER] = val - return + # We can't use _get_info_from_caches as it would use local worker cache + # first and an update of storage policy mode may not be detected. + oio_cache = req.environ.get('oio.cache') + memcache = None + if oio_cache is None: + memcache = getattr(self.app, 'memcache', None) or \ + req.environ.get('swift.cache') + + if memcache is not None: + memcache_key = "/".join( + ("storage_policy", self.account_name, root_container)) + storage_policy = memcache.get(memcache_key) + if storage_policy is not None: + return storage_policy or None oio_headers = {REQID_HEADER: self.trans_id} - oio_cache = req.environ.get('oio.cache') perfdata = req.environ.get('oio.perfdata') try: meta = self.app.storage.container_get_properties( @@ -235,10 +283,10 @@ def enforce_versioning(self, req): except exceptions.NoSuchContainer: raise HTTPNotFound(request=req) - val = meta['system'].get('sys.m2.policy.version', '') - memcache.set(key, val) - if val: - req.headers[FORCEVERSIONING_HEADER] = val + storage_policy = meta['system'].get('sys.m2.policy.storage') + if memcache is not None: + memcache.set(memcache_key, storage_policy or '') + return storage_policy def get_object_head_resp(self, req): storage = self.app.storage @@ -647,8 +695,6 @@ def _object_create(self, account, container, **kwargs): def _store_object(self, req, data_source, headers): content_type = req.headers.get('content-type', 'octet/stream') policy = None - container_info = self.container_info(self.account_name, - self.container_name, req) if 'X-Oio-Storage-Policy' in req.headers: policy = req.headers.get('X-Oio-Storage-Policy') if not self.app.POLICIES.get_by_name(policy): @@ -656,6 +702,8 @@ def _store_object(self, req, data_source, headers): "invalid policy '%s', must be in %s" % (policy, self.app.POLICIES.by_name.keys())) else: + container_info = self.container_info(self.account_name, + self.container_name, req) try: policy_index = int( req.headers.get('X-Backend-Storage-Policy-Index', @@ -665,6 +713,8 @@ def _store_object(self, req, data_source, headers): if policy_index != 0: policy = self.app.POLICIES.get_by_index(policy_index).name else: + policy = self.use_bucket_storage_policy(req) + if policy is None: content_length = int(req.headers.get('content-length', 0)) policy = self._get_auto_policy_from_size(content_length) diff --git a/oioswift/server.py b/oioswift/server.py index c1783ad3..f6d3a2e6 100644 --- a/oioswift/server.py +++ b/oioswift/server.py @@ -59,6 +59,9 @@ def __init__(self, conf, memcache=None, logger=None, account_ring=None, for k, v in conf.iteritems() if k.startswith("sds_")} + self.use_bucket_storage_policy = config_true_value( + conf.get('use_bucket_storage_policy', 'false')) + self.oio_stgpol = [] if 'auto_storage_policies' in conf: for elem in conf['auto_storage_policies'].split(','):