Skip to content

Commit

Permalink
Refactor override system in SDE to align with usage and clarify edge …
Browse files Browse the repository at this point in the history
…cases (#220)
  • Loading branch information
matthewfcarlson authored Oct 23, 2020
1 parent f6740d0 commit 37d3678
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 76 deletions.
9 changes: 6 additions & 3 deletions docs/features/feature_sde.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,15 @@ The following path_env fields are optional or conditional:
- id

- Part of the Override System, this field defines the name that this file will be referred to as by any override_id fields.
It is an error for there to be multiple files with the same id in the same set of scopes.
The SDE will throw an exception in this case.
You are welcome to have the same id if they're on separate scopes that won't overlap.

- override_id

- This file will override any descriptor files found in lower lexical order or scope order. Files are traversed in
directory order (depth first) and then scope order (highest to lowest). Overrides are only applied forward in the
traversal, not backwards.
- This file will override any descriptor files.
Override can apply to any type of descriptor (a path env can override an ext_dep for example).
If two descriptors override the same file, this is considered an error and the SDE will throw an exception.

## The Belly of the Beast

Expand Down
151 changes: 80 additions & 71 deletions edk2toolext/environment/self_describing_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,81 +69,90 @@ def load_workspace(self):
logging.debug("Loading workspace: %s" % self.workspace)
logging.debug(" Including scopes: %s" % ', '.join(self.scopes))

#
# First, we need to get all of the files that describe our environment.
#
env_files = self._gather_env_files(
('path_env', 'ext_dep', 'plug_in'), self.workspace)
env_files = self._gather_env_files(('path_env', 'ext_dep', 'plug_in'), self.workspace)

# Next, get a list of all our scopes
all_scopes_lower = [x.lower() for x in self.scopes]

#
# Now that the files have been found, load them, sort them, and filter them
# so they can be applied to the environment.
#
def _sort_and_filter_descriptors(class_type, file_list, scopes):
all_descriptors = tuple(class_type(
desc_file).descriptor_contents for desc_file in file_list)

known_ids = {}
active_overrides = {}
final_list = []

for scope in scopes:
for descriptor in all_descriptors:
# If this descriptor isn't in the current scope, we can ignore it for now.
if descriptor['scope'].lower() != scope.lower():
continue

cur_file = descriptor['descriptor_file']

# If this descriptor has an ID, we need to check for overrides and collisions.
if 'id' in descriptor:
cur_id = descriptor['id'].lower()

# First, check for overrides. There's no reason to process this file if it's being overridden.
if cur_id in active_overrides:
logging.debug("Descriptor '%s' is being overridden by descriptor '%s' based on ID '%s'." % (
cur_file, active_overrides[cur_id], cur_id))
continue

# Next, check for ID collisions.
if cur_id in known_ids:
raise RuntimeError(
"Descriptor '%s' shares the same ID '%s' with descriptor '%s'." % (
cur_file, cur_id, known_ids[cur_id])
)

# Finally, we can add this file to the known IDs list.
known_ids[cur_id] = cur_file

# If we're still processing, we can add this descriptor to the output.
logging.debug("Adding descriptor '%s' to the environment with scope '%s'." % (
cur_file, scope))
final_list.append(descriptor)

# Finally, check to see whether this descriptor overrides anything else.
if 'override_id' in descriptor:
cur_override_id = descriptor['override_id'].lower()
# If we're attempting to override someting that's already been processed,
# we should spit out a warning of sort.
if cur_override_id in known_ids:
logging.warning("Descriptor '%s' is trying to override iID '%s', "
"but it's already been processed." %
(cur_file, cur_override_id))
active_overrides[cur_override_id] = descriptor['descriptor_file']

return tuple(final_list)

if 'path_env' in env_files:
self.paths = _sort_and_filter_descriptors(
EDF.PathEnvDescriptor, env_files['path_env'], self.scopes)

if 'ext_dep' in env_files:
self.extdeps = _sort_and_filter_descriptors(
EDF.ExternDepDescriptor, env_files['ext_dep'], self.scopes)

if 'plug_in' in env_files:
self.plugins = _sort_and_filter_descriptors(
EDF.PluginDescriptor, env_files['plug_in'], self.scopes)

# We need to convert them from files to descriptors
all_descriptors = list()

# helper function to get all the descriptors of a type and cast them
def _get_all_descriptors_of_type(key, class_type):
if key not in env_files:
return tuple()
return tuple(class_type(desc_file) for desc_file in env_files[key])

# Collect all the descriptors of each type
all_descriptors.extend(_get_all_descriptors_of_type('path_env', EDF.PathEnvDescriptor))
all_descriptors.extend(_get_all_descriptors_of_type('ext_dep', EDF.ExternDepDescriptor))
all_descriptors.extend(_get_all_descriptors_of_type('plug_in', EDF.PluginDescriptor))

# Get the properly scoped descriptors by checking if the scope is in the list of all the scopes
scoped_desc_gen = [x for x in all_descriptors if x.descriptor_contents['scope'].lower() in all_scopes_lower]
scoped_descriptors = list(scoped_desc_gen)

# Check that each found item has a unique ID, that's an error if it isn't
allids_gen = [x.descriptor_contents['id'].lower() for x in scoped_descriptors if 'id' in x.descriptor_contents]
all_ids = list(allids_gen)
all_unique_ids = set(all_ids)
if len(all_ids) != len(all_unique_ids):
logging.error("Multiple descriptor files share the same id")
all_unique_id_dict = {}
for desc_id in all_ids:
dict_id_seen = desc_id not in all_unique_id_dict
all_unique_id_dict[desc_id] = 1 if dict_id_seen else all_unique_id_dict[desc_id] + 1
for desc_id in all_unique_id_dict:
if all_unique_id_dict[desc_id] == 1:
continue
# get the descriptors
desc_of_id = [x for x in scoped_descriptors if x.descriptor_contents['id'].lower() == desc_id]
paths_of_desc_of_id = [x.file_path for x in desc_of_id]
invalid_desc_paths = f"{os.pathsep} ".join(paths_of_desc_of_id)
logging.error(f"Descriptors that have this id {desc_id}: {invalid_desc_paths}")
raise RuntimeError("Multiple descriptor files share the same id")

# Now check for overrides, first get a list of all the descriptors that have an override id tag
override_descriptors = [x for x in scoped_descriptors if "override_id" in x.descriptor_contents]
active_overrides = {}
for desc in override_descriptors:
override_id = desc.descriptor_contents["override_id"].lower()
# we found this was already overriden, make sure to let the user know
if override_id in active_overrides:
logging.warning("A descriptor file is trying to override a file that was previously overriden")
logging.warning(f"File ID being overriden: {override_id}")
logging.warning(f"Previous override: {active_overrides[override_id].file_path}")
logging.warning(f"New override: {desc.file_path}")
raise RuntimeError(f"Multiple descriptor files share the same override_id: {override_id}")
active_overrides[override_id] = desc

# Now we filter the overriden id's out and debug to the user whether we are including them or not
overriden_ids = active_overrides.keys()
final_descriptors = []
for desc in scoped_descriptors:
desc_file = desc.file_path
if 'id' in desc.descriptor_contents:
desc_id = desc.descriptor_contents['id'].lower()
if desc_id in overriden_ids:
override = active_overrides[desc_id]
desc_name = f"{desc_file}:{desc_id}"
override_name = f"{override.file_path}"
logging.debug(f"Skipping descriptor {desc_name} as it is being overridden by {override_name}.")
continue
# add them to the final list
desc_scope = desc.descriptor_contents['scope']
logging.debug(f"Adding descriptor {desc_file} to the environment with scope {desc_scope}")
final_descriptors.append(desc)

# Finally, sort them back in the right categories
self.paths = list([x.descriptor_contents for x in final_descriptors if isinstance(x, EDF.PathEnvDescriptor)])
self.extdeps = list(
[x.descriptor_contents for x in final_descriptors if isinstance(x, EDF.ExternDepDescriptor)])
self.plugins = list([x.descriptor_contents for x in final_descriptors if isinstance(x, EDF.PluginDescriptor)])

return self

Expand Down
38 changes: 36 additions & 2 deletions edk2toolext/tests/test_self_describing_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ def test_collect_path_env(self):
build_env, shell_env = self_describing_environment.BootstrapEnvironment(self.workspace, scopes)
self.assertEqual(len(build_env.paths), 4)

def test_collect_path_env_scoped(self):
''' makes sure the SDE can collect path env with the right scopes '''
scopes = ("global", "testing")
tree = uefi_tree(self.workspace, create_platform=False)
tree.create_path_env("testing_corebuild", scope="testing")
tree.create_path_env("testing_corebuild2", scope="not_valid")
build_env, shell_env = self_describing_environment.BootstrapEnvironment(self.workspace, scopes)
self.assertEqual(len(build_env.paths), 1)

def test_override_path_env(self):
''' checks the SDE descriptor override system '''
custom_scope = "global"
Expand All @@ -52,8 +61,32 @@ def test_override_path_env(self):
tree.create_path_env("testing_corebuild2", var_name="jokes", scope=custom_scope,
extra_data={"override_id": "testing_corebuild"})
build_env, shell_env = self_describing_environment.BootstrapEnvironment(self.workspace, scopes)
print(build_env.paths)
print(tree.get_workspace())
self.assertEqual(len(build_env.paths), 1)

def test_multiple_override_path_env(self):
''' checks the SDE descriptor override system will throw an error on multiple overrides'''
custom_scope = "global"
scopes = (custom_scope,)
tree = uefi_tree(self.workspace, create_platform=False)
tree.create_path_env("testing_corebuild", var_name="hey", dir_path="test1", scope=custom_scope)
tree.create_path_env("testing_corebuild2", var_name="jokes", scope=custom_scope,
extra_data={"override_id": "testing_corebuild"})
tree.create_path_env("testing_corebuild3", var_name="laughs", scope=custom_scope,
extra_data={"override_id": "testing_corebuild"})
# we should get an exception because we have two overrides
with self.assertRaises(RuntimeError):
build_env, shell_env = self_describing_environment.BootstrapEnvironment(self.workspace, scopes)
self.fail()

def test_override_path_env_swapped_order(self):
''' checks the SDE descriptor override system with reversed paths so they are discovered in opposite order'''
custom_scope = "global"
scopes = (custom_scope,)
tree = uefi_tree(self.workspace, create_platform=False)
tree.create_path_env("testing_corebuild", var_name="hey", scope=custom_scope)
tree.create_path_env(var_name="jokes", dir_path="test1", scope=custom_scope,
extra_data={"override_id": "testing_corebuild"})
build_env, shell_env = self_describing_environment.BootstrapEnvironment(self.workspace, scopes)
self.assertEqual(len(build_env.paths), 1)

def test_duplicate_id_path_env(self):
Expand All @@ -65,6 +98,7 @@ def test_duplicate_id_path_env(self):
tree.create_path_env("testing_corebuild")
with self.assertRaises(RuntimeError):
self_describing_environment.BootstrapEnvironment(self.workspace, scopes)
self.fail()


if __name__ == '__main__':
Expand Down

0 comments on commit 37d3678

Please sign in to comment.