Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process cache groups #5238

Open
adamrtalbot opened this issue Aug 19, 2024 · 2 comments
Open

Process cache groups #5238

adamrtalbot opened this issue Aug 19, 2024 · 2 comments

Comments

@adamrtalbot
Copy link
Collaborator

New feature

Sometimes an error in the process is only exposes later on in the pipeline, when a downstream process fails. In these cases, it would be useful to force resume to restart from an earlier step in the pipeline rather than the failed process.

Usage scenario

Let's imagine a scenario where have three processes. The first is non-deterministic, because it uses a fancy new AI algorithm. The second and third use the output of the first process in sequence, however sometimes the third process will fail because the algorithm doesn't reach equilibrium or something. We might solve this by using the resume feature of Nextflow and trying to catch the error, but this will skip process 1 and 2 and jump straight to 3. This might just repeat the error, so we would prefer to start from process 1 again. Here's a minimal example:

params.exitcode = 1

process RANDOM {
    output:
    path("output.txt")

    script:
    """
    echo \$RANDOM > output.txt
    """
}

process DO_THING_WITH_RANDOM {
    input:
    path "input.txt"

    output:
    path("output.txt")

    script:
    """
    cat input.txt > output.txt
    """
}

process FAIL_WITH_RANDOM {
    input:
    path "input.txt"
    val exitcode

    output:
    path "output.txt"

    script:
    """
    cat input.txt > output.txt
    exit $exitcode
    """
}

workflow {
    RANDOM()
    DO_THING_WITH_RANDOM(RANDOM.out)
    FAIL_WITH_RANDOM(DO_THING_WITH_RANDOM.out, params.exitcode)
}

In this case, there is nothing we can do to make RANDOM restart when using -resume, even though it the output will change every time we run it.

Suggest implementation

If we could 'group' caches up, so if any are invalidated within a set we could restart from all of them. For example, we could add a key value which can be used to associate processes by sample ID:

process MYPROCESS { 
    cache true, key: id
    
    input:
    tuple val(id), path(bam), path(bai)
    ...
}

Alternatively, we should provide the tools for developers to add this to the errorStrategy so this could be baked into the pipeline itself. This might follow a similar pattern:

process MYPROCESS { 
    errorStrategy "retry"
    errorGroup id
    
    input:
    tuple val(id), path(bam), path(bai)
    ...
}
@adamrtalbot
Copy link
Collaborator Author

For a real world example, this is relevant to machine learning based algorithms such as Alphafold who may not know if step 1 is valid until performing a later step.

@bentsherman
Copy link
Member

A hacky version of this would be to run Nextflow-in-Nexflow, then the cache of the inner nextflow run is essentially a "cache group" as you described.

This is related to subworkflow grouping, which makes me wonder if it could be supported using the proposed syntax in nf-core/fetchngs#309 .

How about this:

workflow {
  inputs = Channel.of( 1, 2, 3 )

  inputs
    |> map(cacheGroup: true) { input ->
      input |> RANDOM |> DO_THING_WITH_RANDOM
    }
    |> map { out ->
      FAIL_WITH_RANDOM(out, params.exitcode)
    }
}

Since the first two processes are grouped in the same closure, it is trivial to group them into shared behaviors like a cache group. Whereas a cache id at the process level would allow for groupings that don't make sense (e.g. grouping two processes in completely different subworkflows).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants