From ed2337fcc7edb6f45fd694a2194d5b2e0ce2f424 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Prpi=C4=8D?= Date: Wed, 16 Dec 2020 20:32:17 -0500 Subject: [PATCH] Initial commit --- .gitignore | 108 ++++++++++++++++++ cvelib/__init__.py | 1 + cvelib/cli.py | 274 +++++++++++++++++++++++++++++++++++++++++++++ cvelib/idr.py | 83 ++++++++++++++ pyproject.toml | 2 + setup.py | 47 ++++++++ 6 files changed, 515 insertions(+) create mode 100644 .gitignore create mode 100644 cvelib/__init__.py create mode 100644 cvelib/cli.py create mode 100644 cvelib/idr.py create mode 100644 pyproject.toml create mode 100644 setup.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ac115f6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,108 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST +pip-wheel-metadata/ + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# IPython +profile_default/ +ipython_config.py + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Editors +.idea/ diff --git a/cvelib/__init__.py b/cvelib/__init__.py new file mode 100644 index 0000000..f102a9c --- /dev/null +++ b/cvelib/__init__.py @@ -0,0 +1 @@ +__version__ = "0.0.1" diff --git a/cvelib/cli.py b/cvelib/cli.py new file mode 100644 index 0000000..3f224da --- /dev/null +++ b/cvelib/cli.py @@ -0,0 +1,274 @@ +import json +import re +import sys +from datetime import date, datetime + +import click + +from .idr import Idr, IdrException + +CVE_RE = re.compile(r"^CVE-[12]\d{3}-\d{4,}$") +CONTEXT_SETTINGS = { + "help_option_names": ["-h", "--help"], + "show_default": True, + "max_content_width": 100, +} + + +def validate_cve(ctx, param, value): + if value is None: + return + if not CVE_RE.match(value): + raise click.BadParameter("invalid CVE ID") + return value + + +def validate_year(ctx, param, value): + if value is None: + return + # Hopefully this code won't be around in year 10,000. + if not re.match(r"^[1-9]\d{3}$", value): + raise click.BadParameter("invalid year") + return value + + +def print_ts(ts): + return datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%c") + + +def natural_cve_sort(cve): + if not cve: + return [] + return [int(x) for x in cve.split("-")[1:]] + + +class Config: + def __init__(self, username, org, api_key, env, idr_url, interactive): + self.username = username + self.org = org + self.api_key = api_key + self.env = env + self.idr_url = idr_url + self.interactive = interactive + + def init_idr(self, **kwargs): + return Idr( + username=self.username, + org=self.org, + api_key=self.api_key, + env=self.env, + url=self.idr_url, + **kwargs, + ) + + +pass_config = click.make_pass_decorator(Config) + + +@click.group(context_settings=CONTEXT_SETTINGS) +@click.option("-u", "--username", envvar="CVE_USER", required=True, help="User name") +@click.option("-o", "--org", envvar="CVE_ORG", required=True, help="CNA organization short name") +@click.option("-a", "--api-key", envvar="CVE_API_KEY", required=True, help="API key") +@click.option( + "-e", + "--env", + envvar="CVE_ENVIRONMENT", + default="prod", + type=click.Choice(["prod", "dev"]), + help="Select deployment environment to query.", +) +@click.option( + "--idr-url", + envvar="CVE_IDR_URL", + help="Provide arbitrary URL for the IDR service.", +) +@click.option( + "-i", + "--interactive", + envvar="CVE_INTERACTIVE", + default=False, + is_flag=True, + help="Confirm create/update actions before execution.", +) +@click.pass_context +def cli(ctx, username, org, api_key, env, idr_url, interactive): + """A CLI interface for the CVE Project services.""" + ctx.obj = Config(username, org, api_key, env, idr_url, interactive) + + +@cli.command(context_settings=CONTEXT_SETTINGS) +@click.option( + "-r", + "--random", + default=False, + is_flag=True, + help="Reserve multiple CVE IDs non-sequentially.", +) +@click.option( + "-y", + "--year", + default=lambda: str(date.today().year), + callback=validate_year, + help="Reserve CVE ID(s) for a given year.", + show_default="current year", +) +@click.option("--raw", "print_raw", default=False, is_flag=True, help="Print response JSON.") +@click.argument("count", default=1, type=click.IntRange(min=1)) +@pass_config +def reserve(ctx, random, year, count, print_raw): + """Reserve one or more CVE IDs. + + CVE IDs can be reserved one by one (the lowest IDs are reserved first) or in batches of + multiple IDs per single request. When reserving multiple IDs, you can request those IDs to be + generated sequentially or non-sequentially. + + For more information, see: "Developer Guide to CVE Services API" (https://git.io/JLcmZ) + """ + if random and count > 10: + raise click.BadParameter("requesting non-sequential CVE IDs is limited to 10 per request") + + if ctx.interactive: + click.echo("You are about to reserve ", nl=False) + if count > 1: + click.secho( + f"{count} {'non-sequential' if random else 'sequential'} ", bold=True, nl=False + ) + click.echo("CVE IDs for year ", nl=False) + else: + click.secho("1 ", bold=True, nl=False) + click.echo("CVE ID for year ", nl=False) + click.secho(year, bold=True, nl=False) + click.echo(".") + if not click.confirm("This operation cannot be reversed; do you want to continue?"): + print("Exiting...") + sys.exit(0) + + idr = ctx.init_idr() + response = idr.reserve(count, random, year) + cve_data = response.json() + + if print_raw: + click.echo(json.dumps(cve_data, indent=4, sort_keys=True)) + else: + click.echo("Reserved the following CVE ID(s):\n") + for cve in cve_data["cve_ids"]: + click.echo(cve) + + click.echo(f"\nRemaining quota: {response.headers['CVE-API-REMAINING-QUOTA']}") + + +@cli.command(name="show", context_settings=CONTEXT_SETTINGS) +@click.option("--raw", "print_raw", default=False, is_flag=True, help="Print response JSON.") +@click.argument("cve_id", callback=validate_cve) +@pass_config +def show_cve(ctx, print_raw, cve_id): + """Display a specific CVE ID owned by your CNA.""" + idr = ctx.init_idr() + response = idr.show_cve(cve_id=cve_id) + cve = response.json() + + if print_raw: + click.echo(json.dumps(cve, indent=4, sort_keys=True)) + else: + click.secho(cve["cve_id"], bold=True) + click.echo(f"├─ State:\t{cve['state']}") + click.echo(f"├─ Owning CNA:\t{cve['owning_cna']}") + click.echo(f"├─ Reserved by:\t{cve['requested_by']['user']} ({cve['requested_by']['cna']})") + click.echo(f"└─ Reserved on:\t{cve['reserved']}") + + +@cli.command(name="list", context_settings=CONTEXT_SETTINGS) +@click.option("--raw", "print_raw", default=False, is_flag=True, help="Print response JSON.") +@click.option( + "--sort-by", + type=click.Choice(["cve_id", "state", "user", "reserved"], case_sensitive=False), + default="cve_id", + help="Sort output.", +) +@click.option("--year", callback=validate_year, help="Filter by year.") +@click.option( + "--state", + # type=click.Choice(["RESERVED", "PUBLIC", "REJECT"], case_sensitive=False), + help="Filter by reservation state.", +) +@click.option( + "--reserved-lt", type=click.DateTime(), help="Filter by reservation time before timestamp." +) +@click.option( + "--reserved-gt", type=click.DateTime(), help="Filter by reservation time after timestamp." +) +@pass_config +def list_cves(ctx, print_raw, sort_by, **query): + """Filter and list CVE IDs owned by your CNA.""" + idr = ctx.init_idr() + response = idr.list_cves(**query) + cve_data = response.json() + + if print_raw: + click.echo(json.dumps(cve_data, indent=4, sort_keys=True)) + return + + cves = cve_data["cve_ids"] + if sort_by: + key = sort_by.lower() + if key == "user": + cves.sort(key=lambda x: x["requested_by"]["user"]) + elif key == "cve_id": + cves.sort(key=lambda x: natural_cve_sort(x["cve_id"])) + elif key == "reserved_asc": + cves.sort(key=lambda x: x["reserved"]) + elif key == "state": + cves.sort(key=lambda x: x["state"]) + + lines = [("CVE ID", "STATE", "OWNING CNA", "REQUESTED BY", "RESERVED")] + for cve in cves: + lines.append( + ( + cve["cve_id"], + cve["state"], + cve["owning_cna"], + f"{cve['requested_by']['user']} ({cve['requested_by']['cna']})", + print_ts(cve["reserved"]), + ) + ) + col_widths = [] + for item_index in range(len(lines[0])): + max_len_value = max(lines, key=lambda x: len(x[item_index])) + col_widths.append(len(max_len_value[item_index])) + + for idx, line in enumerate(lines): + text = "".join(f"{value:<{width + 3}}" for value, width in zip(line, col_widths)).strip() + if idx == 0: + click.secho(text, bold=True) + else: + click.echo(text) + + +@cli.command(context_settings=CONTEXT_SETTINGS) +@pass_config +def quota(ctx): + """Display the available quota for your CNA.""" + idr = ctx.init_idr() + response = idr.quota() + idr_quota = response.json() + + click.echo("CNA quota for ", nl=False) + click.secho(f"{ctx.org}", bold=True, nl=False) + click.echo(f":\t{idr_quota['id_quota']}") + click.echo(f"├─ Reserved:\t\t{idr_quota['total_reserved']}") + click.echo(f"└─ Available:\t\t{idr_quota['available']}") + + +@cli.command(context_settings=CONTEXT_SETTINGS) +@pass_config +def ping(ctx): + """Ping the IDR service to see if it is up.""" + idr = ctx.init_idr(raise_exc=False) + idr_status = idr.ping() + + click.echo("IDR API Status: ", nl=False) + if idr_status.ok: + click.secho("OK", fg="green") + else: + click.secho(f"NOT OK ({idr_status.status_code})", fg="red") + click.echo(f"└─ {idr.url}") diff --git a/cvelib/idr.py b/cvelib/idr.py new file mode 100644 index 0000000..7dd85d4 --- /dev/null +++ b/cvelib/idr.py @@ -0,0 +1,83 @@ +from urllib.parse import urljoin + +import requests + + +class IdrException(Exception): + """Raise when encountering errors returned by the IDR API.""" + + pass + + +class Idr: + ENVS = { + "prod": "https://cveawg.mitre.org/api/", + "dev": "https://cveawg-dev.mitre.org/api/", + } + + def __init__(self, username, org, api_key, env="prod", url=None, raise_exc=True): + self.username = username + self.org = org + self.api_key = api_key + self.url = url or self.ENVS.get(env) + if not self.url: + raise ValueError("Missing URL for IDR") + self.raise_exc = raise_exc + + def http_request(self, method, path, **kwargs): + url = urljoin(self.url, path) + headers = { + "CVE-API-KEY": self.api_key, + "CVE-API-ORG": self.org, + "CVE-API-USER": self.username, + } + response = requests.request(method=method, url=url, timeout=60, headers=headers, **kwargs) + if self.raise_exc: + try: + response.raise_for_status() + except requests.exceptions.RequestException as exc: + if exc.response is not None: + try: + error = exc.response.json() + except ValueError: + error = exc.response.content + raise IdrException(f"{exc}; returned error: {error}") from None + else: + raise IdrException(str(exc)) from None + return response + + def get(self, path, **kwargs): + return self.http_request("get", path, **kwargs) + + def post(self, path, **kwargs): + return self.http_request("post", path, **kwargs) + + def reserve(self, count, random, year): + params = { + "cve_year": year, + "amount": count, + } + if count > 1: + params["batch_type"] = "nonsequential" if random else "sequential" + return self.post("cve-id", params=params) + + def show_cve(self, cve_id): + return self.get(f"cve-id/{cve_id}") + + def list_cves(self, year=None, state=None, reserved_lt=None, reserved_gt=None): + params = {} + if year: + params["cve_id_year"] = year + if state: + params["state"] = state.upper() + if reserved_lt: + params["time_reserved.lt"] = reserved_lt.isoformat() + if reserved_gt: + params["time_reserved.gt"] = reserved_gt.isoformat() + return self.get(f"cve-id", params=params) + + def quota(self): + return self.get(f"org/{self.org}/id_quota") + + def ping(self): + return self.get("health-check") diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..aa4949a --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,2 @@ +[tool.black] +line-length = 100 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..80bae4e --- /dev/null +++ b/setup.py @@ -0,0 +1,47 @@ +from setuptools import setup + +requires = [ + "click>=7.1.2", + "requests>=2.24.0", +] + +with open("README.md") as f: + readme = f.read() + +with open("cvelib/__init__.py") as f: + for line in f: + if line.startswith("__version__"): + delim = '"' if '"' in line else "'" + version = line.split(delim)[1] + break + else: + raise RuntimeError("Unable to find version string.") + +setup( + name="cvelib", + version=version, + description="A library and command line interface for the CVE Project services.", + long_description=readme, + long_description_content_type="text/markdown", + url="TODO", + author="Red Hat Product Security", + author_email="secalert@redhat.com", + license="MIT", + classifiers=[ + "Topic :: Security", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + ], + include_package_data=True, + packages=["cvelib"], + install_requires=requires, + entry_points={ + "console_scripts": [ + "cve = cvelib.cli:cli", + ], + }, +)