Skip to content

Commit

Permalink
feat: Add vulnerability exception methods (#145)
Browse files Browse the repository at this point in the history
New methods for the SdScanningClient:
- add_vulnerability_exception_bundle
- delete_vulnerability_exception_bundle
- list_vulnerability_exception_bundles
- get_vulnerability_exception_bundle
- add_vulnerability_exception
- delete_vulnerability_exception
- update_vulnerability_exception
  • Loading branch information
tembleking authored Oct 2, 2020
1 parent 7cf09bb commit 191918f
Show file tree
Hide file tree
Showing 3 changed files with 314 additions and 17 deletions.
148 changes: 134 additions & 14 deletions sdcclient/_scanning.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import base64
import hashlib
import json
import re
import time

import requests
from requests_toolbelt.multipart.encoder import MultipartEncoder
import time

try:
from urllib.parse import quote_plus, unquote_plus
Expand Down Expand Up @@ -164,7 +164,8 @@ def query_image_vuln(self, image, vuln_type="", vendor_only=True):
'''
return self._query_image(image, query_group='vuln', query_type=vuln_type, vendor_only=vendor_only)

def query_images_by_vulnerability(self, vulnerability_id, namespace=None, package=None, severity=None, vendor_only=True):
def query_images_by_vulnerability(self, vulnerability_id, namespace=None, package=None, severity=None,
vendor_only=True):
'''**Description**
Search system for images with the given vulnerability ID present
Expand Down Expand Up @@ -408,17 +409,18 @@ def get_image_scan_result_by_id(self, image_id, full_tag_name, detail):
A JSON object containing pass/fail status of image scan policy.
'''
url = "{base_url}/api/scanning/v1/anchore/images/by_id/{image_id}/check?tag={full_tag_name}&detail={detail}".format(
base_url=self.url,
image_id=image_id,
full_tag_name=full_tag_name,
detail=detail)
base_url=self.url,
image_id=image_id,
full_tag_name=full_tag_name,
detail=detail)
res = requests.get(url, headers=self.hdrs, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]

return [True, res.json()]

def add_registry(self, registry, registry_user, registry_pass, insecure=False, registry_type="docker_v2", validate=True):
def add_registry(self, registry, registry_user, registry_pass, insecure=False, registry_type="docker_v2",
validate=True):
'''**Description**
Add image registry
Expand All @@ -437,7 +439,8 @@ def add_registry(self, registry, registry_user, registry_pass, insecure=False, r
if registry_type and registry_type not in registry_types:
return [False, "input registry type not supported (supported registry_types: " + str(registry_types)]
if self._registry_string_is_valid(registry):
return [False, "input registry name cannot contain '/' characters - valid registry names are of the form <host>:<port> where :<port> is optional"]
return [False,
"input registry name cannot contain '/' characters - valid registry names are of the form <host>:<port> where :<port> is optional"]

if not registry_type:
registry_type = self._get_registry_type(registry)
Expand All @@ -458,7 +461,8 @@ def add_registry(self, registry, registry_user, registry_pass, insecure=False, r

return [True, res.json()]

def update_registry(self, registry, registry_user, registry_pass, insecure=False, registry_type="docker_v2", validate=True):
def update_registry(self, registry, registry_user, registry_pass, insecure=False, registry_type="docker_v2",
validate=True):
'''**Description**
Update an existing image registry.
Expand All @@ -474,7 +478,8 @@ def update_registry(self, registry, registry_user, registry_pass, insecure=False
A JSON object representing the registry.
'''
if self._registry_string_is_valid(registry):
return [False, "input registry name cannot contain '/' characters - valid registry names are of the form <host>:<port> where :<port> is optional"]
return [False,
"input registry name cannot contain '/' characters - valid registry names are of the form <host>:<port> where :<port> is optional"]

payload = {
'registry': registry,
Expand Down Expand Up @@ -502,7 +507,8 @@ def delete_registry(self, registry):
'''
# do some input string checking
if re.match(".*\\/.*", registry):
return [False, "input registry name cannot contain '/' characters - valid registry names are of the form <host>:<port> where :<port> is optional"]
return [False,
"input registry name cannot contain '/' characters - valid registry names are of the form <host>:<port> where :<port> is optional"]

url = self.url + "/api/scanning/v1/anchore/registries/" + registry
res = requests.delete(url, headers=self.hdrs, verify=self.ssl_verify)
Expand Down Expand Up @@ -539,7 +545,8 @@ def get_registry(self, registry):
A JSON object representing the registry.
'''
if self._registry_string_is_valid(registry):
return [False, "input registry name cannot contain '/' characters - valid registry names are of the form <host>:<port> where :<port> is optional"]
return [False,
"input registry name cannot contain '/' characters - valid registry names are of the form <host>:<port> where :<port> is optional"]

url = self.url + "/api/scanning/v1/anchore/registries/" + registry
res = requests.get(url, headers=self.hdrs, verify=self.ssl_verify)
Expand Down Expand Up @@ -1059,4 +1066,117 @@ def get_vulnerability_details(self, id):
if "vulnerabilities" not in json_res or not json_res["vulnerabilities"]:
return [False, f"Vulnerability {id} was not found"]

return [True, json_res["vulnerabilities"][0]]
return [True, json_res["vulnerabilities"][0]]

def add_vulnerability_exception_bundle(self, name, comment=""):
if not name:
return [False, "A name is required for the exception bundle"]

url = self.url + f"/api/scanning/v1/vulnexceptions"
params = {
"version": "1_0",
"name": name,
"comment": comment,
}

data = json.dumps(params)
res = requests.post(url, headers=self.hdrs, data=data, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]

return [True, res.json()]

def delete_vulnerability_exception_bundle(self, id):

url = self.url + f"/api/scanning/v1/vulnexceptions/{id}"

res = requests.delete(url, headers=self.hdrs, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]

return [True, None]

def list_vulnerability_exception_bundles(self):
url = self.url + f"/api/scanning/v1/vulnexceptions"

params = {
"bundleId": "default",
}

res = requests.get(url, params=params, headers=self.hdrs, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]

return [True, res.json()]

def get_vulnerability_exception_bundle(self, bundle):
url = f"{self.url}/api/scanning/v1/vulnexceptions/{bundle}"

params = {
"bundleId": "default",
}

res = requests.get(url, params=params, headers=self.hdrs, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]

res_json = res.json()
for item in res_json["items"]:
item["trigger_id"] = str(item["trigger_id"]).rstrip("+*")
return [True, res_json]

def add_vulnerability_exception(self, bundle, cve, note=None, expiration_date=None):
url = f"{self.url}/api/scanning/v1/vulnexceptions/{bundle}/vulnerabilities"

params = {
"gate": "vulnerabilities",
"is_busy": False,
"trigger_id": f"{cve}+*",
"expiration_date": int(expiration_date) if expiration_date else None,
"notes": note,
}

data = json.dumps(params)
res = requests.post(url, headers=self.hdrs, data=data, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]

res_json = res.json()
res_json["trigger_id"] = str(res_json["trigger_id"]).rstrip("+*")
return [True, res_json]

def delete_vulnerability_exception(self, bundle, id):
url = f"{self.url}/api/scanning/v1/vulnexceptions/{bundle}/vulnerabilities/{id}"

params = {
"bundleId": "default",
}

res = requests.delete(url, params=params, headers=self.hdrs, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]

return [True, None]

def update_vulnerability_exception(self, bundle, id, cve, enabled, note, expiration_date):
url = f"{self.url}/api/scanning/v1/vulnexceptions/{bundle}/vulnerabilities/{id}"

data = {
"id": id,
"gate": "vulnerabilities",
"trigger_id": f"{cve}+*",
"enabled": enabled,
"notes": note,
"expiration_date": int(expiration_date) if expiration_date else None,
}
params = {
"bundleId": "default",
}

res = requests.put(url, data=json.dumps(data), params=params, headers=self.hdrs, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]

res_json = res.json()
res_json["trigger_id"] = str(res_json["trigger_id"]).rstrip("+*")
return [True, res_json]
178 changes: 178 additions & 0 deletions specs/secure/scanning_vulnerability_exceptions_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import datetime
import os
import uuid

from expects import equal, expect, contain, be_empty, have_key, be_true, have_keys, not_, be_false, be_above
from mamba import before, context, description, after, it

from sdcclient import SdScanningClient
from specs import be_successful_api_call

with description("Scanning vulnerability exceptions") as self:
with before.each:
self.client = SdScanningClient(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"),
token=os.getenv("SDC_SECURE_TOKEN"))

with after.each:
self.clean_bundles()


def clean_bundles(self):
_, res = self.client.list_vulnerability_exception_bundles()
for bundle in res:
if str(bundle["name"]).startswith("test_exception_bundle_"):
call = self.client.delete_vulnerability_exception_bundle(id=bundle["id"])
expect(call).to(be_successful_api_call)


with context("when we are creating a new vulnerability exception bundle"):
with it("creates the bundle correctly"):
exception_bundle = f"test_exception_bundle_{uuid.uuid4()}"
exception_comment = "This is an example of an exception bundle"
ok, res = self.client.add_vulnerability_exception_bundle(name=exception_bundle, comment=exception_comment)

expect((ok, res)).to(be_successful_api_call)
expect(res).to(
have_keys("id", items=be_empty, policyBundleId=equal("default"), version="1_0",
comment=equal(exception_comment), name=equal(exception_bundle))
)

with it("creates the bundle correctly with name only and removes it correctly"):
exception_bundle = f"test_exception_bundle_{uuid.uuid4()}"
ok, res = self.client.add_vulnerability_exception_bundle(name=exception_bundle)

expect((ok, res)).to(be_successful_api_call)
expect(res).to(
have_keys("id", items=be_empty, policyBundleId=equal("default"), version="1_0",
comment=be_empty, name=equal(exception_bundle))
)

with context("when we are listing the vulnerability exception bundles"):
with before.each:
self.exception_bundle = f"test_exception_bundle_{uuid.uuid4()}"
ok, res = self.client.add_vulnerability_exception_bundle(name=self.exception_bundle)
expect((ok, res)).to(be_successful_api_call)
self.created_exception_bundle = res["id"]

with it("retrieves the list of bundles"):
ok, res = self.client.list_vulnerability_exception_bundles()

expect((ok, res)).to(be_successful_api_call)
expect(res).to(contain(
have_keys(id=self.created_exception_bundle, items=None, policyBundleId=equal("default"),
version=equal("1_0"), comment=be_empty, name=equal(self.exception_bundle))
))

with context("when we are working with vulnerability exceptions in a bundle"):
with before.each:
ok, res = self.client.add_vulnerability_exception_bundle(name=f"test_exception_bundle_{uuid.uuid4()}")
expect((ok, res)).to(be_successful_api_call)
self.created_exception_bundle = res["id"]

with it("is able to add a vulnerability exception to a bundle"):
exception_notes = "Microsoft Vulnerability"
exception_cve = "CVE-2020-1234"
ok, res = self.client.add_vulnerability_exception(bundle=self.created_exception_bundle,
cve=exception_cve,
note=exception_notes,
expiration_date=datetime.datetime(2030, 12, 31)
.timestamp())

expect((ok, res)).to(be_successful_api_call)
expect(res).to(
have_keys("id", "description", gate=equal("vulnerabilities"), trigger_id=equal(exception_cve),
notes=equal(exception_notes), enabled=be_true)
)

with context("and there are existing vulnerability exceptions"):
with before.each:
self.created_exception_cve = "CVE-2020-1234"
ok, res = self.client.add_vulnerability_exception(bundle=self.created_exception_bundle,
cve=self.created_exception_cve)
expect((ok, res)).to(be_successful_api_call)
self.created_exception = res["id"]

with it("is able to list all the vulnerability exceptions from a bundle"):
ok, res = self.client.get_vulnerability_exception_bundle(bundle=self.created_exception_bundle)

expect((ok, res)).to(be_successful_api_call)
expect(res).to(
have_keys(id=equal(self.created_exception_bundle),
items=contain(
have_keys(
id=equal(self.created_exception),
gate=equal("vulnerabilities"),
trigger_id=equal(self.created_exception_cve),
enabled=be_true,
)
))
)

with it("is able to remove them"):
_, ex_before = self.client.get_vulnerability_exception_bundle(bundle=self.created_exception_bundle)
ok, res = self.client.delete_vulnerability_exception(bundle=self.created_exception_bundle,
id=self.created_exception)
_, ex_after = self.client.get_vulnerability_exception_bundle(bundle=self.created_exception_bundle)

expect((ok, res)).to(be_successful_api_call)
expect(ex_before).to(
have_key("items", contain(
have_keys(
id=equal(self.created_exception),
gate=equal("vulnerabilities"),
trigger_id=equal(self.created_exception_cve),
enabled=be_true,
)
))
)
expect(ex_after).to(
have_key("items", not_(contain(
have_keys(
id=equal(self.created_exception),
gate=equal("vulnerabilities"),
trigger_id=equal(self.created_exception_cve),
enabled=be_true,
)
)))
)

with it("is able to update them"):
_, ex_before = self.client.get_vulnerability_exception_bundle(bundle=self.created_exception_bundle)

ok, res = self.client.update_vulnerability_exception(bundle=self.created_exception_bundle,
id=self.created_exception,
cve="CVE-2020-1235",
enabled=False,
note="Dummy note",
expiration_date=datetime.datetime(2030, 12, 31)
.timestamp())

_, ex_after = self.client.get_vulnerability_exception_bundle(bundle=self.created_exception_bundle)

expect((ok, res)).to(be_successful_api_call)

expect(ex_before).to(
have_key("items", contain(
have_keys(
id=equal(self.created_exception),
gate=equal("vulnerabilities"),
trigger_id=equal(self.created_exception_cve),
notes=equal(None),
expiration_date=equal(None),
enabled=be_true,
)
))
)

expect(ex_after).to(
have_key("items", contain(
have_keys(
id=equal(self.created_exception),
gate=equal("vulnerabilities"),
trigger_id=equal("CVE-2020-1235"),
notes=equal("Dummy note"),
expiration_date=be_above(0),
enabled=be_false,
)
))
)
Loading

0 comments on commit 191918f

Please sign in to comment.