diff --git a/conf/default.cfg b/conf/default.cfg index ced8d99d..654b64cf 100644 --- a/conf/default.cfg +++ b/conf/default.cfg @@ -43,6 +43,11 @@ sds_default_account = myaccount # backend. oio_storage_policies=SINGLE,EC123,EC64,THREECOPIES,FOURCOPIES +# Enable or disable the use of bucket storage policy. +# When enabled, and if a storage policy is defined for a bucket (root container), +# all objects in this bucket will be uploaded with this storage policy. +# use_bucket_policy = false + # The configuration to chose the policy when unspecified. All the storage # policies mentioned here must have been listed in the `oio_storage_policies` # configuration directive. The string must respect the subsequent format: diff --git a/oioswift/proxy/controllers/obj.py b/oioswift/proxy/controllers/obj.py index 299afcfa..b69c0154 100644 --- a/oioswift/proxy/controllers/obj.py +++ b/oioswift/proxy/controllers/obj.py @@ -211,22 +211,70 @@ def enforce_versioning(self, req): root_container.endswith(MULTIUPLOAD_SUFFIX)): return None - # We can't use _get_info_from_caches as it would use local worker cache - # first and an update of versioning mode may not be detected. - memcache = getattr(self.app, 'memcache', None) or \ - req.environ.get('swift.cache') - if memcache is None: + oio_cache = req.environ.get('oio.cache') + memcache = None + if oio_cache is None: + # We can't use _get_info_from_caches as it would use local worker + # cache first and an update of versioning mode may not be detected. + memcache = getattr(self.app, 'memcache', None) or \ + req.environ.get('swift.cache') + + if memcache is not None: + key = "/".join(("versioning", self.account_name, root_container)) + value = memcache.get(key) + if value is not None: + if value: + req.headers[FORCEVERSIONING_HEADER] = value + return + + oio_headers = {REQID_HEADER: self.trans_id} + perfdata = req.environ.get('oio.perfdata') + try: + meta = self.app.storage.container_get_properties( + self.account_name, root_container, headers=oio_headers, + cache=oio_cache, perfdata=perfdata) + except exceptions.NoSuchContainer: + raise HTTPNotFound(request=req) + + value = meta['system'].get('sys.m2.policy.version') + if memcache is not None: + memcache.set(key, value or '') + + if value: + req.headers[FORCEVERSIONING_HEADER] = value + + def use_bucket_policy(self, req): + """ + Enforce the storage policy mode of a container just before executing + an object operation. This is useful when the current object is not + stored in the "main" container but in a shard, + where the storage policy mode may not have been set yet. + """ + if not self.app.use_bucket_policy: return None - key = "/".join(("versioning", self.account_name, root_container)) - val = memcache.get(key) - if val is not None: - if val != '': - req.headers[FORCEVERSIONING_HEADER] = val - return + root_container = req.headers.get(BUCKET_NAME_HEADER) + if root_container is None: + return None + if root_container.endswith(MULTIUPLOAD_SUFFIX): + root_container = root_container[:-len(MULTIUPLOAD_SUFFIX)] - oio_headers = {REQID_HEADER: self.trans_id} oio_cache = req.environ.get('oio.cache') + memcache = None + if oio_cache is None: + # We can't use _get_info_from_caches as it would use local worker + # cache first and an update of storage policy mode may not be + # detected. + memcache = getattr(self.app, 'memcache', None) or \ + req.environ.get('swift.cache') + + if memcache is not None: + key = "/".join(("policy", self.account_name, root_container)) + value = memcache.get(key) + if value is not None: + return value or None + + oio_headers = {REQID_HEADER: self.trans_id} perfdata = req.environ.get('oio.perfdata') try: meta = self.app.storage.container_get_properties( @@ -234,11 +282,12 @@ def enforce_versioning(self, req): cache=oio_cache, perfdata=perfdata) except exceptions.NoSuchContainer: raise HTTPNotFound(request=req) + value = meta['system'].get('sys.m2.policy.storage') + + if memcache is not None: + memcache.set(key, value or '') - val = meta['system'].get('sys.m2.policy.version', '') - memcache.set(key, val) - if val: - req.headers[FORCEVERSIONING_HEADER] = val + return value def get_object_head_resp(self, req): storage = self.app.storage @@ -647,8 +696,6 @@ def _object_create(self, account, container, **kwargs): def _store_object(self, req, data_source, headers): content_type = req.headers.get('content-type', 'octet/stream') policy = None - container_info = self.container_info(self.account_name, - self.container_name, req) if 'X-Oio-Storage-Policy' in req.headers: policy = req.headers.get('X-Oio-Storage-Policy') if not self.app.POLICIES.get_by_name(policy): @@ -656,6 +703,8 @@ def _store_object(self, req, data_source, headers): "invalid policy '%s', must be in %s" % (policy, self.app.POLICIES.by_name.keys())) else: + container_info = self.container_info(self.account_name, + self.container_name, req) try: policy_index = int( req.headers.get('X-Backend-Storage-Policy-Index', @@ -665,6 +714,8 @@ def _store_object(self, req, data_source, headers): if policy_index != 0: policy = self.app.POLICIES.get_by_index(policy_index).name else: + policy = self.use_bucket_policy(req) + if policy is None: content_length = int(req.headers.get('content-length', 0)) policy = self._get_auto_policy_from_size(content_length) diff --git a/oioswift/server.py b/oioswift/server.py index c1783ad3..63b60afe 100644 --- a/oioswift/server.py +++ b/oioswift/server.py @@ -59,6 +59,9 @@ def __init__(self, conf, memcache=None, logger=None, account_ring=None, for k, v in conf.iteritems() if k.startswith("sds_")} + self.use_bucket_policy = config_true_value( + conf.get('use_bucket_policy', 'false')) + self.oio_stgpol = [] if 'auto_storage_policies' in conf: for elem in conf['auto_storage_policies'].split(','):