From ae83447f031bee99df00ed47674d98319210b745 Mon Sep 17 00:00:00 2001 From: Jakub Frejlach Date: Thu, 16 Nov 2023 17:23:40 +0100 Subject: [PATCH] Implement click custom options/arguments/commands --- griffon/commands/custom_commands.py | 89 ++++++++++++++++++ griffon/commands/plugins/affects.py | 123 ++++++++++++++++++++++++ griffon/commands/queries.py | 139 +++++++++++++++++++--------- 3 files changed, 306 insertions(+), 45 deletions(-) create mode 100644 griffon/commands/custom_commands.py create mode 100644 griffon/commands/plugins/affects.py diff --git a/griffon/commands/custom_commands.py b/griffon/commands/custom_commands.py new file mode 100644 index 0000000..5270ee7 --- /dev/null +++ b/griffon/commands/custom_commands.py @@ -0,0 +1,89 @@ +""" +Custom defined Click commands/options/etc. +""" + +import click + +from griffon.exceptions import GriffonUsageError +from griffon.helpers import Style + + +class ListParamType(click.ParamType): + """Custom comma-separated list type""" + + name = "list" + + def convert(self, value, param, ctx): + if value is None: + return [] + return value.split(",") + + +class BaseGroupParameter: + """ + Custom base parameter which handles: + * mutually exclusive options + * required group options (one of the group is required) + """ + + def handle_parse_result(self, ctx, opts, args): + if self.mutually_exclusive_group: + if opts.get(self.name) is None: + pass # skip check for not supplied click.Arguments + elif self.name in opts and any( + opt in opts for opt in self.mutually_exclusive_group if opts[opt] is not None + ): + raise GriffonUsageError( + ( + f"{Style.BOLD}{self.name} cannot be used with " + f"{', '.join(self.mutually_exclusive_group)}.{Style.RESET}" + ), + ctx=ctx, + ) + + if self.required_group: + group_set = set( + opt for opt in opts if opt in self.required_group and opts[opt] is not None + ) + if not any(group_set): + raise GriffonUsageError( + f"{Style.BOLD}At least one of {', '.join(self.required_group)} " + f"is required.{Style.RESET}", + ctx=ctx, + ) + + return super().handle_parse_result(ctx, opts, args) + + +class GroupOption(BaseGroupParameter, click.Option): + """Custom Option with BaseGroupParameter functionality""" + + def __init__(self, *args, **kwargs): + self.mutually_exclusive_group = set(kwargs.pop("mutually_exclusive_group", [])) + self.required_group = set(kwargs.pop("required_group", [])) + + if self.mutually_exclusive_group: + mutually_exclusive_str = ", ".join(self.mutually_exclusive_group) + kwargs["help"] = kwargs.get("help", "") + ( + f", this argument is mutually exclusive " + f"with arguments: {Style.BOLD}[{mutually_exclusive_str}]{Style.RESET}" + ) + + if self.required_group: + required_str = ", ".join(self.required_group) + kwargs["help"] = kwargs.get("help", "") + ( + f", at least one of these arguments: " + f"{Style.BOLD}[{required_str}]{Style.RESET} is required." + ) + + super().__init__(*args, **kwargs) + + +class GroupArgument(BaseGroupParameter, click.Argument): + """Custom Argument with BaseGroupParameter functionality""" + + def __init__(self, *args, **kwargs): + self.mutually_exclusive_group = set(kwargs.pop("mutually_exclusive_group", [])) + self.required_group = set(kwargs.pop("required_group", [])) + + super().__init__(*args, **kwargs) diff --git a/griffon/commands/plugins/affects.py b/griffon/commands/plugins/affects.py new file mode 100644 index 0000000..2dc4276 --- /dev/null +++ b/griffon/commands/plugins/affects.py @@ -0,0 +1,123 @@ +""" +Plugin for handling affects +""" + +import logging + +import click +import sfm2client +from rich.console import Console + +from griffon import OSIDBService, console_status, get_config_option, progress_bar +from griffon.commands.custom_commands import GroupOption, ListParamType +from griffon.commands.queries import get_product_contain_component +from griffon.exceptions import GriffonException +from griffon.helpers import Color, Style + +console = Console() + +logger = logging.getLogger("griffon") + + +@click.group(help="Affects plugin") +@click.pass_context +def plugins(ctx): + """Affects plugin for automating specific affects filing workflows""" + pass + + +@plugins.command() +@click.option( + "-b", + "--bz-id", + "bz_id", + help="Bugzilla ID", + cls=GroupOption, + mutually_exclusive_group=["flaw_id"], + required_group=["bz_id", "flaw_id"], +) +@click.option( + "-f", + "--flaw-id", + "flaw_id", + help="UUID / CVE ID", + cls=GroupOption, + mutually_exclusive_group=["bz_id"], + required_group=["bz_id", "flaw_id"], +) +@click.option( + "--base-compiler-components", + "base_compiler_components", + default="", + help="Comma-separated list of components", + type=ListParamType(), +) +@click.pass_context +def go_stdlib(ctx, bz_id, flaw_id, base_compiler_components): + """Go stdlib affects handler""" + + with console_status(ctx) as status: + status.update("Querying OSIDB for Flaw data") + session = OSIDBService.create_session() + try: + # TODO: acquire component/subcomponents directely instead of parsing from + # Bugzilla summary once OSIDB-1420 is resolved + if flaw_id: + flaw = session.flaws.retrieve( + flaw_id, include_meta_attr="bz_summary,bz_id", include_fields="meta_attr" + ) + elif bz_id: + flaw = session.flaws.retrieve_list( + bz_id=bz_id, include_meta_attr="bz_summary,bz_id", include_fields="meta_attr" + ).results[0] + except Exception as e: + if ctx.obj["VERBOSE"]: + logger.error(e) + + flaw_id_msg = f'with ID "{flaw_id}"' if flaw_id else f'with Bugzilla ID "{bz_id}"' + raise GriffonException( + f"Could not retrieve flaw {flaw_id_msg}", + ) + + # # base compiler affects + for component_name in base_compiler_components: + ctx.invoke( + get_product_contain_component, + component_name=component_name, + search_latest=True, + sfm2_flaw_id=flaw_id, + flaw_mode="replace", + strict_name_search=True, + filter_rh_naming=True, + ) + + # rpm affects + subcomponent = flaw.meta_attr["bz_summary"].split(":")[1].strip() + ctx.obj["PROFILE"] = "latest" + ctx.invoke( + get_product_contain_component, + component_name=subcomponent, + # search_latest=True, + sfm2_flaw_id=flaw_id, + flaw_mode="replace", + strict_name_search=True, + filter_rh_naming=True, + output_type_filter="GOLANG", + component_type="RPM", + ) + + # container-first, only one per product + ctx.invoke( + get_product_contain_component, + component_name=subcomponent, + # search_latest=True, + sfm2_flaw_id=flaw_id, + flaw_mode="replace", + strict_name_search=True, + filter_rh_naming=True, + output_type_filter="GOLANG", + component_type="RPM", + ) + + # special affects for all containers built with golang + # TODO diff --git a/griffon/commands/queries.py b/griffon/commands/queries.py index 9197b08..a880f5d 100644 --- a/griffon/commands/queries.py +++ b/griffon/commands/queries.py @@ -28,6 +28,7 @@ get_product_stream_ofuris, get_product_version_names, ) +from griffon.commands.custom_commands import GroupArgument, GroupOption from griffon.commands.entities.corgi import ( get_component_manifest, get_component_summary, @@ -157,10 +158,17 @@ def retrieve_component_summary(ctx, component_name, strict_name_search): ) @click.argument( "component_name", + cls=GroupArgument, required=False, + required_group=["component_name", "purl"], + mutually_exclusive_group=["purl"], ) @click.option( - "--purl", help="Component purl, needs to be in quotes (ex. 'pkg:rpm/python-pyjwt@1.7.1')" + "--purl", + cls=GroupOption, + help="Component purl, needs to be in quotes (ex. 'pkg:rpm/python-pyjwt@1.7.1')", + required_group=["component_name", "purl"], + mutually_exclusive_group=["component_name"], ) @click.option( "--arch", @@ -361,11 +369,6 @@ def get_product_contain_component( """List products of a latest component.""" if verbose: ctx.obj["VERBOSE"] = verbose - if not purl and not component_name: - click.echo(ctx.get_help()) - click.echo("") - click.echo(f"{Style.BOLD}Must supply Component name or --purl.{Style.RESET}") - exit(0) if ( not search_latest @@ -592,8 +595,19 @@ def get_product_contain_component( name="components-contain-component", help="List Components containing Component.", ) -@click.argument("component_name", required=False) -@click.option("--purl") +@click.argument( + "component_name", + cls=GroupArgument, + required=False, + required_group=["component_name", "purl"], + mutually_exclusive_group=["purl"], +) +@click.option( + "--purl", + cls=GroupOption, + required_group=["component_name", "purl"], + mutually_exclusive_group=["component_name"], +) @click.option("--type", "component_type", type=click.Choice(CorgiService.get_component_types())) @click.option("--version", "component_version") @click.option( @@ -633,9 +647,6 @@ def get_component_contain_component( """List components that contain component.""" if verbose: ctx.obj["VERBOSE"] = verbose - if not component_name and not purl: - click.echo(ctx.get_help()) - exit(0) if component_name: q = query_service.invoke(core_queries.components_containing_component_query, ctx.params) cprint(q, ctx=ctx) @@ -650,8 +661,23 @@ def get_component_contain_component( name="product-manifest", help="Get Product manifest (includes Root Components and all dependencies).", ) -@click.argument("product_stream_name", required=False, shell_complete=get_product_stream_names) -@click.option("--ofuri", "ofuri", type=click.STRING, shell_complete=get_product_stream_ofuris) +@click.argument( + "product_stream_name", + cls=GroupArgument, + required=False, + shell_complete=get_product_stream_names, + required_group=["ofuri", "product_stream_name"], + mutually_exclusive_group=["ofuri"], +) +@click.option( + "--ofuri", + "ofuri", + cls=GroupOption, + type=click.STRING, + shell_complete=get_product_stream_ofuris, + required_group=["ofuri", "product_stream_name"], + mutually_exclusive_group=["product_stream_name"], +) @click.option( "--spdx-json", "spdx_json_format", @@ -662,10 +688,6 @@ def get_component_contain_component( @click.pass_context def get_product_manifest_query(ctx, product_stream_name, ofuri, spdx_json_format): """List components of a specific product version.""" - if not ofuri and not product_stream_name: - click.echo(ctx.get_help()) - exit(0) - if spdx_json_format: ctx.ensure_object(dict) ctx.obj["FORMAT"] = "json" # TODO - investigate if we need yaml format. @@ -682,8 +704,23 @@ def get_product_manifest_query(ctx, product_stream_name, ofuri, spdx_json_format name="product-components", help="List LATEST Root Components of Product.", ) -@click.argument("product_stream_name", required=False, shell_complete=get_product_stream_names) -@click.option("--ofuri", "ofuri", type=click.STRING, shell_complete=get_product_stream_ofuris) +@click.argument( + "product_stream_name", + cls=GroupArgument, + required=False, + shell_complete=get_product_stream_names, + required_group=["ofuri", "product_stream_name"], + mutually_exclusive_group=["ofuri"], +) +@click.option( + "--ofuri", + "ofuri", + cls=GroupOption, + type=click.STRING, + shell_complete=get_product_stream_ofuris, + required_group=["ofuri", "product_stream_name"], + mutually_exclusive_group=["product_stream_name"], +) @query_params_options( entity="Component", endpoint_module=v1_components_list, @@ -704,9 +741,6 @@ def get_product_latest_components_query(ctx, product_stream_name, ofuri, verbose if verbose: ctx.obj["VERBOSE"] = verbose ctx.params.pop("verbose") - if not ofuri and not product_stream_name: - click.echo(ctx.get_help()) - exit(0) if ofuri: params["ofuri"] = ofuri if product_stream_name: @@ -724,8 +758,20 @@ def get_product_latest_components_query(ctx, product_stream_name, ofuri, verbose name="component-manifest", help="Get Component manifest.", ) -@click.option("--uuid", "component_uuid") -@click.option("--purl", help="Component Purl (must be quoted).") +@click.option( + "--uuid", + "component_uuid", + cls=GroupOption, + required_group=["component_uuid", "purl"], + mutually_exclusive_group=["purl"], +) +@click.option( + "--purl", + cls=GroupOption, + required_group=["component_uuid", "purl"], + mutually_exclusive_group=["component_uuid"], + help="Component Purl (must be quoted).", +) @click.option( "--spdx-json", "spdx_json_format", @@ -736,9 +782,6 @@ def get_product_latest_components_query(ctx, product_stream_name, ofuri, verbose @click.pass_context def retrieve_component_manifest(ctx, component_uuid, purl, spdx_json_format): """Retrieve Component manifest.""" - if not component_uuid and not purl: - click.echo(ctx.get_help()) - exit(0) if spdx_json_format: ctx.ensure_object(dict) ctx.obj["FORMAT"] = "json" @@ -754,7 +797,7 @@ def retrieve_component_manifest(ctx, component_uuid, purl, spdx_json_format): name="components-affected-by-flaw", help="List Components affected by Flaw.", ) -@click.argument("cve_id", required=False, type=click.STRING, shell_complete=get_cve_ids) +@click.argument("cve_id", required=True, type=click.STRING, shell_complete=get_cve_ids) @click.option( "--affectedness", help="Filter by Affect affectedness.", @@ -789,9 +832,6 @@ def components_affected_by_specific_cve_query( ctx, cve_id, affectedness, affect_resolution, affect_impact, component_type, namespace ): """List components affected by specific CVE.""" - if not cve_id: - click.echo(ctx.get_help()) - exit(0) q = query_service.invoke(core_queries.components_affected_by_specific_cve_query, ctx.params) cprint(q, ctx=ctx) @@ -800,14 +840,11 @@ def components_affected_by_specific_cve_query( name="products-affected-by-flaw", help="List Products affected by Flaw.", ) -@click.argument("cve_id", required=False, type=click.STRING, shell_complete=get_cve_ids) +@click.argument("cve_id", required=True, type=click.STRING, shell_complete=get_cve_ids) @click.pass_context @progress_bar def product_versions_affected_by_cve_query(ctx, cve_id): """List Products affected by a CVE.""" - if not cve_id: - click.echo(ctx.get_help()) - exit(0) q = query_service.invoke( core_queries.products_versions_affected_by_specific_cve_query, ctx.params ) @@ -815,8 +852,19 @@ def product_versions_affected_by_cve_query(ctx, cve_id): @queries_grp.command(name="component-flaws", help="List Flaws affecting a Component.") -@click.argument("component_name", required=False) -@click.option("--purl") +@click.argument( + "component_name", + cls=GroupArgument, + required=False, + required_group=["component_name", "purl"], + mutually_exclusive_group=["purl"], +) +@click.option( + "--purl", + cls=GroupOption, + required_group=["component_name", "purl"], + mutually_exclusive_group=["component_name"], +) @click.option( "--flaw-impact", "flaw_impact", @@ -867,10 +915,6 @@ def cves_for_specific_component_query( strict_name_search, ): """List flaws of a specific component.""" - if not purl and not component_name: - click.echo(ctx.get_help()) - exit(0) - q = query_service.invoke(core_queries.cves_for_specific_component_query, ctx.params) cprint(q, ctx=ctx) @@ -883,9 +927,17 @@ def cves_for_specific_component_query( "product_version_name", required=False, type=click.STRING, + cls=GroupArgument, shell_complete=get_product_version_names, + required_group=["ofuri", "product_version_name"], + mutually_exclusive_group=["ofuri"], +) +@click.option( + "--ofuri", + cls=GroupOption, + required_group=["ofuri", "product_version_name"], + mutually_exclusive_group=["product_version_name"], ) -@click.option("--ofuri") @click.option( "--flaw-impact", "flaw_impact", @@ -936,8 +988,5 @@ def cves_for_specific_product_query( strict_name_search, ): """List flaws of a specific product.""" - if not product_version_name and not ofuri: - click.echo(ctx.get_help()) - exit(0) q = query_service.invoke(core_queries.cves_for_specific_product_query, ctx.params) cprint(q, ctx=ctx)