Skip to content

Commit

Permalink
cachito provides new endpoint with sbom in CycloneDX format
Browse files Browse the repository at this point in the history
* STONEBLD-520

Signed-off-by: Robert Cerven <[email protected]>
  • Loading branch information
rcerven authored and chmeliik committed Feb 22, 2023
1 parent 5b7b4c1 commit 888df03
Show file tree
Hide file tree
Showing 17 changed files with 3,174 additions and 45 deletions.
127 changes: 97 additions & 30 deletions cachito/web/api_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from collections import OrderedDict
from copy import deepcopy
from datetime import date, datetime
from typing import Any, Dict, Union
from typing import Any, Dict, List, Set, Union

import flask
import kombu.exceptions
Expand All @@ -24,7 +24,7 @@
from cachito.common.utils import b64encode
from cachito.errors import MessageBrokerError, NoWorkers, RequestErrorOrigin, ValidationError
from cachito.web import db
from cachito.web.content_manifest import BASE_ICM
from cachito.web.content_manifest import BASE_ICM, BASE_SBOM
from cachito.web.metrics import cachito_metrics
from cachito.web.models import (
ConfigFileBase64,
Expand Down Expand Up @@ -218,7 +218,7 @@ def get_request_content_manifest(request_id):
)
content_manifest = request.content_manifest
content_manifest_json = content_manifest.to_json()
return send_content_manifest_back(content_manifest_json)
return send_json_file_back(content_manifest_json)


@api_v1.route("/requests/<int:request_id>/environment-variables", methods=["GET"])
Expand Down Expand Up @@ -744,68 +744,135 @@ def get_request_logs(request_id):
)


def send_content_manifest_back(content_manifest: Dict[str, Any]) -> flask.Response:
"""Send content manifest back to the client."""
def send_json_file_back(json_content: Dict[str, Any]) -> flask.Response:
"""Send json file back to the client."""
debug = flask.current_app.logger.debug
fd, filename = tempfile.mkstemp(prefix="request-content-manifest-json-", text=True)
debug("Write content manifest into file: %s", filename)
fd, filename = tempfile.mkstemp(prefix="request-json-", text=True)
debug("Write json into file: %s", filename)
try:
with open(fd, "w") as f:
json.dump(content_manifest, f, sort_keys=True)
json.dump(json_content, f, sort_keys=True)
return flask.send_file(filename, mimetype="application/json")
finally:
debug("The content manifest is sent back to the client. Remove %s", filename)
debug("Json file is sent back to the client. Remove %s", filename)
os.unlink(filename)


@api_v1.route("/content-manifest", methods=["GET"])
def get_content_manifest_by_requests():
"""
Retrieve the content manifest associated with the given requests.
:return: a Flask JSON response
:rtype: flask.Response
:raise BadRequest: if any of the given request is not in the "complete" or
"stale" state, If any of the given request cannot be found.
"""
arg = flask.request.args.get("requests")
if not arg:
return flask.jsonify(BASE_ICM)
def _get_valid_request_ids(all_request_ids: str) -> List[int]:
request_ids = set()
item: str
for item in arg.split(","):
for item in all_request_ids.split(","):
if not item.strip():
continue
if not item.strip().isdigit():
raise BadRequest(f"{item} is not an integer.")
request_ids.add(int(item))

return list(request_ids)


def _get_all_requests(request_ids: List[int]) -> list[Request]:
requests = (
Request.query.filter(Request.id.in_(request_ids))
.options(load_only("id"), joinedload(Request.state))
.all()
)
states = (RequestStateMapping.complete.name, RequestStateMapping.stale.name)

request: Request
for request in requests:
if request.state.state_name not in states:
raise BadRequest(
f"Request {request.id} is in state {request.state.state_name}, "
f"not complete or stale."
)
request_ids.remove(request.id)

if request_ids:
nonexistent_ids = ",".join(map(str, request_ids))
raise BadRequest(f"Cannot find request(s) {nonexistent_ids}.")

return requests


def _check_requests_state(requests: List[Request], valid_states: Set[str]) -> None:
error_msg = ""

request: Request
for request in requests:
if request.state.state_name not in valid_states:
error_msg += (
f"Request {request.id} is in state {request.state.state_name}, "
f"not in {valid_states}.\n"
)

if error_msg:
raise BadRequest(error_msg)


@api_v1.route("/content-manifest", methods=["GET"])
def get_content_manifest_by_requests():
"""
Retrieve the content manifest associated with the given requests.
:return: a Flask JSON response
:rtype: flask.Response
:raise BadRequest: if any of the given request is not in the "complete" or
"stale" state, If any of the given request cannot be found.
"""
arg = flask.request.args.get("requests")
if not arg:
return flask.jsonify(BASE_ICM)

request_ids = _get_valid_request_ids(arg)
requests = _get_all_requests(request_ids)

valid_states = set([RequestStateMapping.complete.name])
_check_requests_state(requests, valid_states)

request: Request
assembled_icm = deepcopy(BASE_ICM)
for request in requests:
manifest = request.content_manifest.to_json()
assembled_icm["image_contents"].extend(manifest["image_contents"])
if len(requests) > 1:
deep_sort_icm(assembled_icm)
return send_content_manifest_back(assembled_icm)
return send_json_file_back(assembled_icm)


@api_v1.route("/sbom", methods=["GET"])
def get_sbom_by_requests() -> flask.Response:
"""
Retrieve the content manifest sbom associated with the given requests.
:return: a Flask JSON response
:rtype: flask.Response
:raise BadRequest: if any of the given request is not in the "complete" or
"stale" state, If any of the given request cannot be found.
"""
arg = flask.request.args.get("requests")
if not arg:
return flask.jsonify(BASE_SBOM)

request_ids = _get_valid_request_ids(arg)
requests = _get_all_requests(request_ids)

valid_states = set([RequestStateMapping.complete.name])
_check_requests_state(requests, valid_states)

request: Request
assembled_sbom = deepcopy(BASE_SBOM)

all_components = []
for request in requests:
sbom_components = request.content_manifest.sbom_components_list()
all_components.extend(sbom_components)

unique_components: List[Dict[str, Any]] = []

all_components.sort(key=lambda c: (c["purl"], c["name"], c.get("version")))

for component in all_components:
if not unique_components or component != unique_components[-1]:
unique_components.append(component)

assembled_sbom["components"] = unique_components

return send_json_file_back(assembled_sbom)


class RequestMetricsArgs(pydantic.BaseModel):
Expand Down
Loading

0 comments on commit 888df03

Please sign in to comment.