diff --git a/lightbeam/api.py b/lightbeam/api.py index c0ca279..f3ba6f2 100644 --- a/lightbeam/api.py +++ b/lightbeam/api.py @@ -304,7 +304,7 @@ async def load_descriptors_values(self): for key in v.keys(): if key.endswith("Id"): descriptor = key[0:-2] self.descriptor_values.append([descriptor, v["namespace"], v["codeValue"], v["shortDescription"], v.get("description", "")]) - + # save if self.lightbeam.track_state: self.logger.debug(f"saving descriptor values to {cache_file}...") @@ -331,33 +331,47 @@ async def load_descriptors_values(self): # } # (The first element is a required attribute of the assessmentItem; the other two are required elements # of the required nested assessmentReference.) - def get_params_for_endpoint(self, endpoint): + def get_params_for_endpoint(self, endpoint, type='required'): if "Descriptor" in endpoint: swagger = self.descriptors_swagger else: swagger = self.resources_swagger - definition = util.camel_case(self.lightbeam.config["namespace"]) + "_" + util.singularize_endpoint(endpoint) - return self.get_required_params_from_swagger(swagger, definition) + definition = util.get_swagger_ref_for_endpoint(self.lightbeam.config["namespace"], swagger, endpoint) + if type=='required': + return self.get_required_params_from_swagger(swagger, definition) + else: + # descriptor endpoints all have the same structure and identity fields: + if "Descriptor" in endpoint: + return { 'namespace':'namespace', 'codeValue':'codeValue', 'shortDescription':'shortDescription'} + else: + return self.get_identity_params_from_swagger(swagger, definition) def get_required_params_from_swagger(self, swagger, definition, prefix=""): params = {} - use_definitions = False - if "definitions" in swagger.keys(): - schema = swagger["definitions"][definition] - use_definitions = True - elif "components" in swagger.keys() and "schemas" in swagger["components"].keys(): - schema = swagger["components"]["schemas"][definition] - else: + schema = util.resolve_swagger_ref(swagger, definition) + if not schema: self.logger.critical(f"Swagger contains neither `definitions` nor `components.schemas` - check that the Swagger is valid.") - for requiredProperty in schema["required"]: - if "$ref" in schema["properties"][requiredProperty].keys(): - sub_definition = schema["properties"][requiredProperty]["$ref"] - if use_definitions: - sub_definition = sub_definition.replace("#/definitions/", "") - else: - sub_definition = sub_definition.replace("#/components/schemas/", "") - sub_params = self.get_required_params_from_swagger(swagger, sub_definition, prefix=requiredProperty+".") + for prop in schema["required"]: + if "$ref" in schema["properties"][prop].keys(): + sub_definition = schema["properties"][prop]["$ref"] + sub_params = self.get_required_params_from_swagger(swagger, sub_definition, prefix=prop+".") + for k,v in sub_params.items(): + params[k] = v + elif schema["properties"][prop]["type"]!="array": + params[prop] = prefix + prop + return params + + def get_identity_params_from_swagger(self, swagger, definition, prefix=""): + params = {} + schema = util.resolve_swagger_ref(swagger, definition) + if not schema: + self.logger.critical(f"Swagger contains neither `definitions` nor `components.schemas` - check that the Swagger is valid.") + + for prop in schema["properties"]: + if prop.endswith("Reference") and "required" in schema.keys() and prop in schema['required'] and "$ref" in schema["properties"][prop].keys(): + sub_definition = schema["properties"][prop]["$ref"] + sub_params = self.get_identity_params_from_swagger(swagger, sub_definition, prefix=prop+".") for k,v in sub_params.items(): params[k] = v - elif schema["properties"][requiredProperty]["type"]!="array": - params[requiredProperty] = prefix + requiredProperty + elif "type" in schema["properties"][prop].keys() and schema["properties"][prop]["type"]!="array" and "x-Ed-Fi-isIdentity" in schema["properties"][prop].keys(): + params[prop] = prefix + prop return params diff --git a/lightbeam/lightbeam.py b/lightbeam/lightbeam.py index 9355e54..f3aed07 100644 --- a/lightbeam/lightbeam.py +++ b/lightbeam/lightbeam.py @@ -166,7 +166,8 @@ def write_structured_output(self, command): # failures.line_numbers are split each on their own line; here we remove those line breaks content = re.sub(r'"line_numbers": \[(\d|,|\s|\n)*\]', self.replace_linebreaks, content) fp.write(content) - self.logger.info(f"results written to {self.results_file}") + + self.logger.info(f"results written to {self.results_file}") def load_config_file(self) -> dict: diff --git a/lightbeam/util.py b/lightbeam/util.py index 2bc6cbb..a8815c8 100644 --- a/lightbeam/util.py +++ b/lightbeam/util.py @@ -79,4 +79,18 @@ def keys_match(key, wildcard_key): if key==wildcard_key: return True if wildcard_key.startswith("*") and key.endswith(wildcard_key.lstrip("*")): return True if wildcard_key.endswith("*") and key.startswith(wildcard_key.rstrip("*")): return True - return False \ No newline at end of file + return False + +def get_swagger_ref_for_endpoint(namespace, swagger, endpoint): + if "definitions" in swagger.keys(): + return "#/definitions/" + camel_case(namespace) + "_" + singularize_endpoint(endpoint) + elif "components" in swagger.keys() and "schemas" in swagger["components"].keys(): + return "#/components/schemas/" + camel_case(namespace) + "_" + singularize_endpoint(endpoint) + +def resolve_swagger_ref(swagger, ref): + if "definitions" in swagger.keys(): + definition = ref.replace("#/definitions/", "") + return swagger["definitions"][definition] + elif "components" in swagger.keys() and "schemas" in swagger["components"].keys(): + definition = ref.replace("#/components/schemas/", "") + return swagger["components"]["schemas"][definition] diff --git a/lightbeam/validate.py b/lightbeam/validate.py index db56b4b..a3f909f 100644 --- a/lightbeam/validate.py +++ b/lightbeam/validate.py @@ -251,7 +251,7 @@ async def do_validate_payload(self, endpoint, file_name, data, line_counter): resolver = RefResolver("test", swagger, swagger) validator = Draft4Validator(resource_schema, resolver=resolver) - params_structure = self.lightbeam.api.get_params_for_endpoint(endpoint) + identity_params_structure = self.lightbeam.api.get_params_for_endpoint(endpoint, type='identity') distinct_params = [] # check payload is valid JSON @@ -281,7 +281,7 @@ async def do_validate_payload(self, endpoint, file_name, data, line_counter): # check natural keys are unique if "uniqueness" in self.validation_methods: - params = json.dumps(util.interpolate_params(params_structure, payload)) + params = json.dumps(util.interpolate_params(identity_params_structure, payload)) params_hash = hashlog.get_hash(params) if params_hash in distinct_params: self.log_validation_error(endpoint, file_name, line_counter, "uniqueness", "duplicate value(s) for natural key(s): {params}")