diff --git a/ingest/README.md b/ingest/README.md index b2a38522..1bba8874 100644 --- a/ingest/README.md +++ b/ingest/README.md @@ -44,7 +44,12 @@ A pair of files for each dengue serotype (denv1 - denv4) Run the complete ingest pipeline and upload results to AWS S3 with ```sh -nextstrain build ingest --configfiles config/config.yaml config/optional.yaml +nextstrain build \ + --env AWS_ACCESS_KEY_ID \ + --env AWS_SECRET_ACCESS_KEY \ + ingest \ + upload_all \ + --configfile build-configs/nextstrain-automation/config.yaml ``` ### Adding new sequences not from GenBank @@ -70,27 +75,25 @@ Do the following to include sequences from static FASTA files. !ingest/data/{file-name}.ndjson ``` -3. Add the `file-name` (without the `.ndjson` extension) as a source to `ingest/config/config.yaml`. This will tell the ingest pipeline to concatenate the records to the GenBank sequences and run them through the same transform pipeline. +3. Add the `file-name` (without the `.ndjson` extension) as a source to `ingest/defaults/config.yaml`. This will tell the ingest pipeline to concatenate the records to the GenBank sequences and run them through the same transform pipeline. ## Configuration -Configuration takes place in `config/config.yaml` by default. -Optional configs for uploading files and Slack notifications are in `config/optional.yaml`. +Configuration takes place in `defaults/config.yaml` by default. +Optional configs for uploading files are in `build-configs/nextstrain-automation/config.yaml`. ### Environment Variables -The complete ingest pipeline with AWS S3 uploads and Slack notifications uses the following environment variables: +The complete ingest pipeline with AWS S3 uploads uses the following environment variables: #### Required - `AWS_ACCESS_KEY_ID` - `AWS_SECRET_ACCESS_KEY` -- `SLACK_TOKEN` -- `SLACK_CHANNELS` #### Optional -These are optional environment variables used in our automated pipeline for providing detailed Slack notifications. +These are optional environment variables used in our automated pipeline. - `GITHUB_RUN_ID` - provided via [`github.run_id` in a GitHub Action workflow](https://docs.github.com/en/actions/learn-github-actions/contexts#github-context) - `AWS_BATCH_JOB_ID` - provided via [AWS Batch Job environment variables](https://docs.aws.amazon.com/batch/latest/userguide/job_env_vars.html) diff --git a/ingest/Snakefile b/ingest/Snakefile index c26bfcec..ad7e0f7c 100644 --- a/ingest/Snakefile +++ b/ingest/Snakefile @@ -4,65 +4,23 @@ min_version( "7.7.0" ) # Snakemake 7.7.0 introduced `retries` directive used in fetch-sequences -if not config: - - configfile: "config/config.yaml" - - -send_slack_notifications = config.get("send_slack_notifications", False) +configfile: "defaults/config.yaml" serotypes = ['all', 'denv1', 'denv2', 'denv3', 'denv4'] -def _get_all_targets(wildcards): - # Default targets are the metadata TSV and sequences FASTA files - all_targets = expand(["results/sequences_{serotype}.fasta", "results/metadata_{serotype}.tsv"], serotype=serotypes) - - - # Add additional targets based on upload config - upload_config = config.get("upload", {}) - - for target, params in upload_config.items(): - files_to_upload = params.get("files_to_upload", {}) - - if not params.get("dst"): - print( - f"Skipping file upload for {target!r} because the destination was not defined." - ) - else: - all_targets.extend( - expand( - [f"data/upload/{target}/{{remote_file_name}}.done"], - zip, - remote_file_name=files_to_upload.keys(), - ) - ) - - # Add additional targets for Nextstrain's internal Slack notifications - if send_slack_notifications: - all_targets.extend( - [ - "data/notify/genbank-record-change.done", - "data/notify/metadata-diff.done", - ] - ) - - if config.get("trigger_rebuild", False): - all_targets.append("data/trigger/rebuild.done") - - return all_targets - rule all: input: - _get_all_targets, - + expand(["results/sequences_{serotype}.fasta", "results/metadata_{serotype}.tsv"], serotype=serotypes) -include: "workflow/snakemake_rules/fetch_sequences.smk" -include: "workflow/snakemake_rules/transform.smk" -include: "workflow/snakemake_rules/split_serotypes.smk" -include: "workflow/snakemake_rules/nextclade.smk" +include: "rules/fetch_from_ncbi.smk" +include: "rules/curate.smk" +include: "rules/split_serotypes.smk" +include: "rules/nextclade.smk" -if config.get("upload", False): +# Include custom rules defined in the config. +if "custom_rules" in config: + for rule_file in config["custom_rules"]: - include: "workflow/snakemake_rules/upload.smk" + include: rule_file diff --git a/ingest/build-configs/nextstrain-automation/config.yaml b/ingest/build-configs/nextstrain-automation/config.yaml new file mode 100644 index 00000000..853cafb3 --- /dev/null +++ b/ingest/build-configs/nextstrain-automation/config.yaml @@ -0,0 +1,29 @@ +# This configuration file should contain all required configuration parameters +# for the ingest workflow to run with additional Nextstrain automation rules. + +# Custom rules to run as part of the Nextstrain automated workflow +# The paths should be relative to the ingest directory. +custom_rules: + - build-configs/nextstrain-automation/upload.smk + +# Nextstrain CloudFront domain to ensure that we invalidate CloudFront after the S3 uploads +# This is required as long as we are using the AWS CLI for uploads +cloudfront_domain: "data.nextstrain.org" + +# Nextstrain AWS S3 Bucket with pathogen prefix +s3_dst: "s3://nextstrain-data/files/workflows/dengue" + +# Mapping of files to upload +files_to_upload: + genbank.ndjson.xz: data/genbank.ndjson + all_sequences.ndjson.xz: data/sequences.ndjson + metadata_all.tsv.zst: results/metadata_all.tsv + sequences_all.fasta.zst: results/sequences_all.fasta + metadata_denv1.tsv.zst: results/metadata_denv1.tsv + sequences_denv1.fasta.zst: results/sequences_denv1.fasta + metadata_denv2.tsv.zst: results/metadata_denv2.tsv + sequences_denv2.fasta.zst: results/sequences_denv2.fasta + metadata_denv3.tsv.zst: results/metadata_denv3.tsv + sequences_denv3.fasta.zst: results/sequences_denv3.fasta + metadata_denv4.tsv.zst: results/metadata_denv4.tsv + sequences_denv4.fasta.zst: results/sequences_denv4.fasta diff --git a/ingest/build-configs/nextstrain-automation/upload.smk b/ingest/build-configs/nextstrain-automation/upload.smk new file mode 100644 index 00000000..8403aef6 --- /dev/null +++ b/ingest/build-configs/nextstrain-automation/upload.smk @@ -0,0 +1,42 @@ +""" +This part of the workflow handles uploading files to AWS S3. + +Files to upload must be defined in the `files_to_upload` config param, where +the keys are the remote files and the values are the local filepaths +relative to the ingest directory. + +Produces a single file for each uploaded file: + "results/upload/{remote_file}.upload" + +The rule `upload_all` can be used as a target to upload all files. +""" +import os + + +rule upload_to_s3: + input: + file_to_upload=lambda wildcards: config["files_to_upload"][wildcards.remote_file], + output: + "results/upload/{remote_file}.upload", + params: + quiet="" if send_notifications else "--quiet", + s3_dst=config["s3_dst"], + cloudfront_domain=config["cloudfront_domain"], + shell: + """ + ./vendored/upload-to-s3 \ + {params.quiet} \ + {input.file_to_upload:q} \ + {params.s3_dst:q}/{wildcards.remote_file:q} \ + {params.cloudfront_domain} 2>&1 | tee {output} + """ + + +rule upload_all: + input: + uploads=[ + f"results/upload/{remote_file}.upload" + for remote_file in config["files_to_upload"].keys() + ], + output: + touch("results/upload_all.done") \ No newline at end of file diff --git a/ingest/config/optional.yaml b/ingest/config/optional.yaml deleted file mode 100644 index c1b40a83..00000000 --- a/ingest/config/optional.yaml +++ /dev/null @@ -1,25 +0,0 @@ -# Optional configs used by Nextstrain team -# Params for uploads -upload: - # Upload params for AWS S3 - s3: - # AWS S3 Bucket with prefix - dst: 's3://nextstrain-data/files/workflows/dengue' - # Mapping of files to upload, with key as remote file name and the value - # the local file path relative to the ingest directory. - files_to_upload: - genbank.ndjson.xz: data/genbank.ndjson - all_sequences.ndjson.xz: data/sequences.ndjson - metadata_all.tsv.zst: results/metadata_all.tsv - sequences_all.fasta.zst: results/sequences_all.fasta - metadata_denv1.tsv.zst: results/metadata_denv1.tsv - sequences_denv1.fasta.zst: results/sequences_denv1.fasta - metadata_denv2.tsv.zst: results/metadata_denv2.tsv - sequences_denv2.fasta.zst: results/sequences_denv2.fasta - metadata_denv3.tsv.zst: results/metadata_denv3.tsv - sequences_denv3.fasta.zst: results/sequences_denv3.fasta - metadata_denv4.tsv.zst: results/metadata_denv4.tsv - sequences_denv4.fasta.zst: results/sequences_denv4.fasta - - cloudfront_domain: 'data.nextstrain.org' - diff --git a/ingest/source-data/annotations.tsv b/ingest/defaults/annotations.tsv similarity index 100% rename from ingest/source-data/annotations.tsv rename to ingest/defaults/annotations.tsv diff --git a/ingest/config/config.yaml b/ingest/defaults/config.yaml similarity index 83% rename from ingest/config/config.yaml rename to ingest/defaults/config.yaml index 5c6f0288..6a748aa3 100644 --- a/ingest/config/config.yaml +++ b/ingest/defaults/config.yaml @@ -23,28 +23,27 @@ ncbi_datasets_fields: - submitter-names - submitter-affiliation -# Params for the transform rule -transform: +# Params for the curate rule +curate: # NCBI Fields to rename to Nextstrain field names. # This is the first step in the pipeline, so any references to field names # in the configs below should use the new field names - field_map: [ - 'accession=genbank_accession', - 'accession-rev=genbank_accession_rev', - 'isolate-lineage=strain', - 'sourcedb=database', # necessary for applying geo location rules - 'geo-region=region', - 'geo-location=location', - 'host-name=host', - 'isolate-collection-date=date', - 'release-date=release_date', - 'update-date=update_date', - 'virus-tax-id=virus_tax_id', - 'virus-name=virus_name', - 'sra-accs=sra_accessions', - 'submitter-names=authors', - 'submitter-affiliation=institution', - ] + field_map: + accession: genbank_accession + accession-rev: genbank_accession_rev + isolate-lineage: strain + sourcedb: database + geo-region: region + geo-location: location + host-name: host + isolate-collection-date: date + release-date: release_date + update-date: update_date + virus-tax-id: virus_tax_id + virus-name: virus_name + sra-accs: sra_accessions + submitter-names: authors + submitter-affiliation: institution # Standardized strain name regex # Currently accepts any characters because we do not have a clear standard for strain names strain_regex: '^.+$' @@ -77,9 +76,9 @@ transform: geolocation_rules_url: 'https://raw.githubusercontent.com/nextstrain/ncov-ingest/master/source-data/gisaid_geoLocationRules.tsv' # Local geolocation rules that are only applicable to mpox data # Local rules can overwrite the general geolocation rules provided above - local_geolocation_rules: 'source-data/geolocation-rules.tsv' + local_geolocation_rules: 'defaults/geolocation-rules.tsv' # User annotations file - annotations: 'source-data/annotations.tsv' + annotations: 'defaults/annotations.tsv' # ID field used to merge annotations annotations_id: 'genbank_accession' # Field to use as the sequence ID in the FASTA file diff --git a/ingest/source-data/geolocation-rules.tsv b/ingest/defaults/geolocation-rules.tsv similarity index 100% rename from ingest/source-data/geolocation-rules.tsv rename to ingest/defaults/geolocation-rules.tsv diff --git a/ingest/workflow/snakemake_rules/transform.smk b/ingest/rules/curate.smk similarity index 63% rename from ingest/workflow/snakemake_rules/transform.smk rename to ingest/rules/curate.smk index e4e543dd..6d5374d3 100644 --- a/ingest/workflow/snakemake_rules/transform.smk +++ b/ingest/rules/curate.smk @@ -1,5 +1,5 @@ """ -This part of the workflow handles transforming the data into standardized +This part of the workflow handles curating the data into standardized formats and expects input file sequences_ndjson = "data/sequences.ndjson" @@ -9,7 +9,7 @@ This will produce output files as metadata = "results/metadata_all.tsv" sequences = "results/sequences_all.fasta" -Parameters are expected to be defined in `config.transform`. +Parameters are expected to be defined in `config.curate`. """ @@ -17,7 +17,7 @@ rule fetch_general_geolocation_rules: output: general_geolocation_rules="data/general-geolocation-rules.tsv", params: - geolocation_rules_url=config["transform"]["geolocation_rules_url"], + geolocation_rules_url=config["curate"]["geolocation_rules_url"], shell: """ curl {params.geolocation_rules_url} > {output.general_geolocation_rules} @@ -27,7 +27,7 @@ rule fetch_general_geolocation_rules: rule concat_geolocation_rules: input: general_geolocation_rules="data/general-geolocation-rules.tsv", - local_geolocation_rules=config["transform"]["local_geolocation_rules"], + local_geolocation_rules=config["curate"]["local_geolocation_rules"], output: all_geolocation_rules="data/all-geolocation-rules.tsv", shell: @@ -36,32 +36,39 @@ rule concat_geolocation_rules: """ -rule transform: +def format_field_map(field_map: dict[str, str]) -> str: + """ + Format dict to `"key1"="value1" "key2"="value2"...` for use in shell commands. + """ + return " ".join([f'"{key}"="{value}"' for key, value in field_map.items()]) + + +rule curate: input: sequences_ndjson="data/sequences.ndjson", all_geolocation_rules="data/all-geolocation-rules.tsv", - annotations=config["transform"]["annotations"], + annotations=config["curate"]["annotations"], output: metadata="data/metadata_all.tsv", sequences="results/sequences_all.fasta", log: - "logs/transform.txt", + "logs/curate.txt", params: - field_map=config["transform"]["field_map"], - strain_regex=config["transform"]["strain_regex"], - strain_backup_fields=config["transform"]["strain_backup_fields"], - date_fields=config["transform"]["date_fields"], - expected_date_formats=config["transform"]["expected_date_formats"], - articles=config["transform"]["titlecase"]["articles"], - abbreviations=config["transform"]["titlecase"]["abbreviations"], - titlecase_fields=config["transform"]["titlecase"]["fields"], - authors_field=config["transform"]["authors_field"], - authors_default_value=config["transform"]["authors_default_value"], - abbr_authors_field=config["transform"]["abbr_authors_field"], - annotations_id=config["transform"]["annotations_id"], - metadata_columns=config["transform"]["metadata_columns"], - id_field=config["transform"]["id_field"], - sequence_field=config["transform"]["sequence_field"], + field_map=format_field_map(config["curate"]["field_map"]), + strain_regex=config["curate"]["strain_regex"], + strain_backup_fields=config["curate"]["strain_backup_fields"], + date_fields=config["curate"]["date_fields"], + expected_date_formats=config["curate"]["expected_date_formats"], + articles=config["curate"]["titlecase"]["articles"], + abbreviations=config["curate"]["titlecase"]["abbreviations"], + titlecase_fields=config["curate"]["titlecase"]["fields"], + authors_field=config["curate"]["authors_field"], + authors_default_value=config["curate"]["authors_default_value"], + abbr_authors_field=config["curate"]["abbr_authors_field"], + annotations_id=config["curate"]["annotations_id"], + metadata_columns=config["curate"]["metadata_columns"], + id_field=config["curate"]["id_field"], + sequence_field=config["curate"]["sequence_field"], shell: """ (cat {input.sequences_ndjson} \ diff --git a/ingest/workflow/snakemake_rules/fetch_sequences.smk b/ingest/rules/fetch_from_ncbi.smk similarity index 100% rename from ingest/workflow/snakemake_rules/fetch_sequences.smk rename to ingest/rules/fetch_from_ncbi.smk diff --git a/ingest/workflow/snakemake_rules/nextclade.smk b/ingest/rules/nextclade.smk similarity index 97% rename from ingest/workflow/snakemake_rules/nextclade.smk rename to ingest/rules/nextclade.smk index 112251cc..88e1800f 100644 --- a/ingest/workflow/snakemake_rules/nextclade.smk +++ b/ingest/rules/nextclade.smk @@ -52,7 +52,7 @@ rule concat_nextclade_subtype_results: output: nextclade_subtypes="results/nextclade_subtypes.tsv", params: - id_field=config["transform"]["id_field"], + id_field=config["curate"]["id_field"], nextclade_field=config["nextclade"]["nextclade_field"], shell: """ @@ -75,7 +75,7 @@ rule append_nextclade_columns: output: metadata_all="results/metadata_all.tsv", params: - id_field=config["transform"]["id_field"], + id_field=config["curate"]["id_field"], nextclade_field=config["nextclade"]["nextclade_field"], shell: """ diff --git a/ingest/workflow/snakemake_rules/split_serotypes.smk b/ingest/rules/split_serotypes.smk similarity index 89% rename from ingest/workflow/snakemake_rules/split_serotypes.smk rename to ingest/rules/split_serotypes.smk index 0c25d83c..0690d5ef 100644 --- a/ingest/workflow/snakemake_rules/split_serotypes.smk +++ b/ingest/rules/split_serotypes.smk @@ -9,7 +9,7 @@ This will produce output files as sequences_{serotype} = "results/sequences_{serotype}.fasta" -Parameters are expected to be defined in `config.transform`. +Parameters are expected to be defined in `config.curate`. """ rule split_by_ncbi_serotype: @@ -22,7 +22,7 @@ rule split_by_ncbi_serotype: output: sequences = "results/sequences_{serotype}.fasta" params: - id_field = config["transform"]["id_field"] + id_field = config["curate"]["id_field"] shell: """ augur filter \ diff --git a/ingest/workflow/snakemake_rules/upload.smk b/ingest/workflow/snakemake_rules/upload.smk deleted file mode 100644 index 60c5c9b7..00000000 --- a/ingest/workflow/snakemake_rules/upload.smk +++ /dev/null @@ -1,64 +0,0 @@ -""" -This part of the workflow handles uploading files to a specified destination. - -Uses predefined wildcard `file_to_upload` determine input and predefined -wildcard `remote_file_name` as the remote file name in the specified destination. - -Produces output files as `data/upload/{upload_target_name}/{remote_file_name}.done`. - -Currently only supports uploads to AWS S3, but additional upload rules can -be easily added as long as they follow the output pattern described above. -""" -import os - -slack_envvars_defined = "SLACK_CHANNELS" in os.environ and "SLACK_TOKEN" in os.environ -send_notifications = ( - config.get("send_slack_notifications", False) and slack_envvars_defined -) - - -def _get_upload_inputs(wildcards): - """ - If the file_to_upload has Slack notifications that depend on diffs with S3 files, - then we want the upload rule to run after the notification rule. - - This function is mostly to keep track of which flag files to expect for - the rules in `slack_notifications.smk`, so it only includes flag files if - `send_notifications` is True. - """ - inputs = { - "file_to_upload": config["upload"]["s3"]["files_to_upload"][ - wildcards.remote_file_name - ], - } - - if send_notifications: - flag_file = [] - - if file_to_upload == "data/genbank.ndjson": - flag_file = "data/notify/genbank-record-change.done" - elif file_to_upload == "results/metadata.tsv": - flag_file = "data/notify/metadata-diff.done" - - inputs["notify_flag_file"] = flag_file - - return inputs - - -rule upload_to_s3: - input: - unpack(_get_upload_inputs), - output: - "data/upload/s3/{remote_file_name}.done", - params: - quiet="" if send_notifications else "--quiet", - s3_dst=config["upload"].get("s3", {}).get("dst", ""), - cloudfront_domain=config["upload"].get("s3", {}).get("cloudfront_domain", ""), - shell: - """ - ./vendored/upload-to-s3 \ - {params.quiet} \ - {input.file_to_upload:q} \ - {params.s3_dst:q}/{wildcards.remote_file_name:q} \ - {params.cloudfront_domain} 2>&1 | tee {output} - """