Skip to content

Commit

Permalink
Improve click snakemake options (#26)
Browse files Browse the repository at this point in the history
* Improve click snakemake options

* Add snakemake test file

* Add snakemake argument list
  • Loading branch information
percyfal authored Jan 10, 2025
1 parent fd06552 commit 5e97347
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 23 deletions.
106 changes: 83 additions & 23 deletions src/nbis/snakemake.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
logger = logging.getLogger(__name__)


def snakemake_argument_list() -> Callable[[FC], FC]:
"""Add snakemake argument list."""

return click.argument("snakemake_args", nargs=-1, type=click.UNPROCESSED)


def get_profile(uri, config):
"""Retrieve snakemake profile from config"""
try:
Expand All @@ -26,17 +32,22 @@ def get_profile(uri, config):


def profile_option(
default: str = "local", expose_value: bool = True
default: str = "local", expose_value: bool = True
) -> Callable[[FC], FC]:
"""Add profile option with callback."""

def profile_callback(
ctx: click.core.Context, param: click.core.Option, value: str
) -> str: # pylint: disable=unused-argument
ctx: click.core.Context, # pylint: disable=unused-argument
param: click.core.Option, # pylint: disable=unused-argument
value: str,
) -> str:
"""Profile callback."""
env = ctx.ensure_object(Environment)
if ctx.params.get("no_profile"):
return []
is_report = ctx.params.get("report", [])
if len(is_report) > 0:
return []
if value is None:
value = default
return ["--profile", get_profile(value, env.config)]
Expand Down Expand Up @@ -68,15 +79,16 @@ def verbose_option(expose_value: bool = False) -> Callable[[FC], FC]:
"""Add verbose option with callback."""

def verbose_callback(
ctx: click.core.Context, param: click.core.Option, value: int
) -> int: # pylint: disable=unused-argument
ctx: click.core.Context, # pylint: disable=unused-argument
param: click.core.Option, # pylint: disable=unused-argument
value: int,
) -> int:
"""Verbose callback."""
log_level = max(3 - value, 0) * 10
logging.basicConfig(
level=log_level,
format=(
"%(asctime)s; %(levelname)s "
"[%(name)s:%(funcName)s]: %(message)s"
"%(asctime)s; %(levelname)s " "[%(name)s:%(funcName)s]: %(message)s"
),
)
return log_level
Expand All @@ -98,8 +110,10 @@ def cores_option(default=None) -> Callable[[FC], FC]:
"""Add cores option."""

def cores_callback(
ctx: click.core.Context, param: click.core.Option, value: int
) -> int: # pylint: disable=unused-argument
ctx: click.core.Context, # pylint: disable=unused-argument
param: click.core.Option, # pylint: disable=unused-argument
value: int,
) -> int:
"""Cores callback."""
if value is None:
return []
Expand All @@ -112,7 +126,7 @@ def cores_callback(
"-c",
"--cores",
help="number of cores",
default=None,
default=default,
callback=cores_callback,
type=int,
)
Expand All @@ -122,8 +136,10 @@ def jobs_option(default=None) -> Callable[[FC], FC]:
"""Add jobs option."""

def jobs_callback(
ctx: click.core.Context, param: click.core.Option, value: int
) -> int: # pylint: disable=unused-argument
ctx: click.core.Context, # pylint: disable=unused-argument
param: click.core.Option, # pylint: disable=unused-argument
value: int,
) -> int:
"""Jobs callback."""
if value is None:
return []
Expand All @@ -142,34 +158,63 @@ def jobs_callback(
)


def threads_option() -> Callable[[FC], FC]:
def threads_option(default=None) -> Callable[[FC], FC]:
"""Add threads option."""

def threads_callback(
ctx: click.core.Context, # pylint: disable=unused-argument
param: click.core.Option, # pylint: disable=unused-argument
value: int,
) -> int:
"""Threads callback."""
if value is None:
return []
if value < 1:
logging.error("Threads must be greater than 0")
raise ValueError("Threads must be greater than 0")
return ["--threads", str(value)]

return click.option(
"-t",
"--threads",
help="number of threads",
default=1,
default=default,
callback=threads_callback,
type=click.IntRange(
1,
),
)


def test_option() -> Callable[[FC], FC]:
"""Add test option"""
def test_option(
config: list[str] = None,
options: list[str] = None,
test_profile: str = None,
) -> Callable[[FC], FC]:
"""Add test option with callback."""

def test_callback(
ctx: click.core.Context, param: click.core.Option, value: int
) -> int: # pylint: disable=unused-argument
ctx: click.core.Context, # pylint: disable=unused-argument
param: click.core.Option, # pylint: disable=unused-argument
value: int,
) -> int:
"""Callback for test option"""
ret = []
if value:
return ["--config", "__test__=True"]
return []
if options:
ret = ret + options
if test_profile:
ret = ret + ["--profile", test_profile]
ret = ret + ["--config", "__test__=True"]
if config:
ret = ret + config
return ret

return click.option(
"--test", is_flag=True, help="run workflow on small test data set",
callback=test_callback
"--test",
is_flag=True,
help="run workflow on small test data set",
callback=test_callback,
)


Expand All @@ -184,13 +229,28 @@ def directory_option():
)


def report_option() -> Callable[[FC], FC]:
def report_option(
report_file: str = "report.html",
) -> Callable[[FC], FC]:
"""Add snakemake report option"""

def report_callback(
ctx: click.core.Context, # pylint: disable=unused-argument
param: click.core.Option, # pylint: disable=unused-argument
value: bool,
) -> int:
"""Callback for report option"""
ret = []
if value:
ret = ["--report", report_file]
return ret

func = click.option(
"--report",
help=("generate snakemake report"),
is_flag=True,
default=False,
callback=report_callback,
)
return func

Expand Down
126 changes: 126 additions & 0 deletions tests/test_snakemake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""Test snakemake module."""

import click

from nbis.snakemake import no_profile_option
from nbis.snakemake import profile_option
from nbis.snakemake import report_option
from nbis.snakemake import snakemake_argument_list
from nbis.snakemake import test_option


def test_snakemake_args(runner):
"""Test snakemake args."""

@click.command(context_settings={"ignore_unknown_options": True})
@profile_option()
@snakemake_argument_list()
def cmd(profile, snakemake_args):
print(profile, list(snakemake_args))

ret = runner.invoke(cmd, [])
assert ret.stdout == "['--profile', 'local'] []\n"
ret = runner.invoke(cmd, ["--dry-run", "--printshellcmds"])
assert ret.stdout == "['--profile', 'local'] ['--dry-run', '--printshellcmds']\n"


class TestProfileOption:
"""Test snakemake profile_option."""

def test_profile_option(self, runner):
"""Test profile_option."""

@click.command()
@profile_option()
def cmd(profile):
print(profile)

ret = runner.invoke(cmd, [])
assert ret.stdout == "['--profile', 'local']\n"
ret = runner.invoke(cmd, ["--profile", "test"])
assert ret.stdout == "['--profile', 'test']\n"

def test_no_profile_option(self, runner):
"""Test no_profile_option."""

@click.command()
@profile_option()
@no_profile_option()
def cmd(profile, no_profile): # pylint: disable=unused-argument
print(profile)

ret = runner.invoke(cmd, [])
assert ret.stdout == "['--profile', 'local']\n"
ret = runner.invoke(cmd, ["--no-profile"])
assert ret.stdout == "[]\n"


class TestReportOption:
"""Test snakemake report_option."""

def test_report_option(self, runner):
"""Test report_option."""

@click.command()
@report_option()
def cmd(report):
print(report)

ret = runner.invoke(cmd, [])
assert ret.stdout == "[]\n"
ret = runner.invoke(cmd, ["--report"])
assert ret.stdout == "['--report', 'report.html']\n"

def test_report_option_with_profile(self, runner):
"""Test report option with profile."""

@click.command()
@report_option(report_file="test.html")
@profile_option()
@no_profile_option()
def cmd(report, profile, no_profile): # pylint: disable=unused-argument
print(report, profile)

ret = runner.invoke(cmd, [])
assert ret.stdout == "[] ['--profile', 'local']\n"
ret = runner.invoke(cmd, ["--report"])
assert ret.stdout == "['--report', 'test.html'] []\n"


class TestTestOption:
"""Test snakemake test_option."""

def test_test_option(self, runner):
"""Test test_option."""

@click.command()
@test_option()
def cmd(test):
print(test)

ret = runner.invoke(cmd, [])
assert ret.stdout == "[]\n"
ret = runner.invoke(cmd, ["--test"])
assert ret.stdout == "['--config', '__test__=True']\n"

def test_test_option_with_config(self, runner):
"""Test test_option with config."""

@click.command()
@test_option(config=["foo=bar"])
def cmd(test):
print(test)

ret = runner.invoke(cmd, ["--test"])
assert ret.stdout == "['--config', '__test__=True', 'foo=bar']\n"

def test_test_option_with_options(self, runner):
"""Test test_option with options."""

@click.command()
@test_option(options=["--foo", "bar"])
def cmd(test):
print(test)

ret = runner.invoke(cmd, ["--test"])
assert ret.stdout == "['--foo', 'bar', '--config', '__test__=True']\n"

0 comments on commit 5e97347

Please sign in to comment.