-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
validate uniqueness by identity #54
base: main
Are you sure you want to change the base?
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -304,7 +304,7 @@ async def load_descriptors_values(self): | |
for key in v.keys(): | ||
if key.endswith("Id"): descriptor = key[0:-2] | ||
self.descriptor_values.append([descriptor, v["namespace"], v["codeValue"], v["shortDescription"], v.get("description", "")]) | ||
|
||
# save | ||
if self.lightbeam.track_state: | ||
self.logger.debug(f"saving descriptor values to {cache_file}...") | ||
|
@@ -331,33 +331,52 @@ async def load_descriptors_values(self): | |
# } | ||
# (The first element is a required attribute of the assessmentItem; the other two are required elements | ||
# of the required nested assessmentReference.) | ||
def get_params_for_endpoint(self, endpoint): | ||
def get_params_for_endpoint(self, endpoint, type='required'): | ||
if "Descriptor" in endpoint: swagger = self.descriptors_swagger | ||
else: swagger = self.resources_swagger | ||
definition = util.camel_case(self.lightbeam.config["namespace"]) + "_" + util.singularize_endpoint(endpoint) | ||
return self.get_required_params_from_swagger(swagger, definition) | ||
definition = util.get_swagger_ref_for_endpoint(self.lightbeam.config["namespace"], swagger, endpoint) | ||
if type=='required': | ||
return self.get_required_params_from_swagger(swagger, definition) | ||
else: | ||
# descriptor endpoints all have the same structure and identity fields: | ||
if "Descriptor" in endpoint: | ||
return { 'namespace':'namespace', 'codeValue':'codeValue', 'shortDescription':'shortDescription'} | ||
else: | ||
return self.get_identity_params_from_swagger(swagger, definition) | ||
|
||
def get_required_params_from_swagger(self, swagger, definition, prefix=""): | ||
params = {} | ||
use_definitions = False | ||
if "definitions" in swagger.keys(): | ||
schema = swagger["definitions"][definition] | ||
use_definitions = True | ||
elif "components" in swagger.keys() and "schemas" in swagger["components"].keys(): | ||
schema = swagger["components"]["schemas"][definition] | ||
else: | ||
schema = util.resolve_swagger_ref(swagger, definition) | ||
if not schema: | ||
self.logger.critical(f"Swagger contains neither `definitions` nor `components.schemas` - check that the Swagger is valid.") | ||
|
||
for requiredProperty in schema["required"]: | ||
if "$ref" in schema["properties"][requiredProperty].keys(): | ||
sub_definition = schema["properties"][requiredProperty]["$ref"] | ||
if use_definitions: | ||
sub_definition = sub_definition.replace("#/definitions/", "") | ||
else: | ||
sub_definition = sub_definition.replace("#/components/schemas/", "") | ||
sub_params = self.get_required_params_from_swagger(swagger, sub_definition, prefix=requiredProperty+".") | ||
for prop in schema["required"]: | ||
if "$ref" in schema["properties"][prop].keys(): | ||
sub_definition = schema["properties"][prop]["$ref"] | ||
sub_params = self.get_required_params_from_swagger(swagger, sub_definition, prefix=prop+".") | ||
for k,v in sub_params.items(): | ||
params[k] = v | ||
elif schema["properties"][prop]["type"]!="array": | ||
params[prop] = prefix + prop | ||
return params | ||
|
||
def get_identity_params_from_swagger(self, swagger, definition, prefix=""): | ||
params = {} | ||
schema = util.resolve_swagger_ref(swagger, definition) | ||
if not schema: | ||
self.logger.critical(f"Swagger contains neither `definitions` nor `components.schemas` - check that the Swagger is valid.") | ||
|
||
for prop in schema["properties"]: | ||
if prop.endswith("Reference") and "required" in schema.keys() and prop in schema['required']: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. required properties of a |
||
sub_definition = schema["properties"][prop]["$ref"] | ||
sub_schema = util.resolve_swagger_ref(swagger, sub_definition) | ||
for sub_prop in sub_schema["required"]: | ||
params[f"{prop}.{sub_prop}"] = sub_prop | ||
elif "$ref" in schema["properties"][prop].keys(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. recurse into |
||
sub_definition = schema["properties"][prop]["$ref"] | ||
sub_params = self.get_identity_params_from_swagger(swagger, sub_definition, prefix=prop+".") | ||
for k,v in sub_params.items(): | ||
params[k] = v | ||
elif schema["properties"][requiredProperty]["type"]!="array": | ||
params[requiredProperty] = prefix + requiredProperty | ||
elif schema["properties"][prop]["type"]!="array" and "x-Ed-Fi-isIdentity" in schema["properties"][prop].keys(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. properties with |
||
params[prop] = prefix + prop | ||
return params |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -166,7 +166,8 @@ def write_structured_output(self, command): | |
# failures.line_numbers are split each on their own line; here we remove those line breaks | ||
content = re.sub(r'"line_numbers": \[(\d|,|\s|\n)*\]', self.replace_linebreaks, content) | ||
fp.write(content) | ||
self.logger.info(f"results written to {self.results_file}") | ||
|
||
self.logger.info(f"results written to {self.results_file}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Added an indent here so it's only printed if a results-file was actually created.) |
||
|
||
|
||
def load_config_file(self) -> dict: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -79,4 +79,18 @@ def keys_match(key, wildcard_key): | |
if key==wildcard_key: return True | ||
if wildcard_key.startswith("*") and key.endswith(wildcard_key.lstrip("*")): return True | ||
if wildcard_key.endswith("*") and key.startswith(wildcard_key.rstrip("*")): return True | ||
return False | ||
return False | ||
|
||
def get_swagger_ref_for_endpoint(namespace, swagger, endpoint): | ||
if "definitions" in swagger.keys(): | ||
return "#/definitions/" + camel_case(namespace) + "_" + singularize_endpoint(endpoint) | ||
elif "components" in swagger.keys() and "schemas" in swagger["components"].keys(): | ||
return "#/components/schemas/" + camel_case(namespace) + "_" + singularize_endpoint(endpoint) | ||
|
||
def resolve_swagger_ref(swagger, ref): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This abstracts Swagger |
||
if "definitions" in swagger.keys(): | ||
definition = ref.replace("#/definitions/", "") | ||
return swagger["definitions"][definition] | ||
elif "components" in swagger.keys() and "schemas" in swagger["components"].keys(): | ||
definition = ref.replace("#/components/schemas/", "") | ||
return swagger["components"]["schemas"][definition] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the default
type
here ensures non-breaking functionality for any other lightbeam functions that use this function