Skip to content

Commit

Permalink
Process files directly from cloud (gcs) (#43)
Browse files Browse the repository at this point in the history
* changes to avoid copy from gcs
will read tif files directly from cloud, both for metadata and for dask
array creation.
  • Loading branch information
akhanf authored Aug 23, 2024
1 parent c3a711f commit 407827c
Show file tree
Hide file tree
Showing 8 changed files with 337 additions and 42 deletions.
2 changes: 1 addition & 1 deletion workflow/rules/bigstitcher.smk
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ rule zarr_to_bdv:
desc="{desc}",
suffix="SPIM.zarr",
),
metadata_json=rules.blaze_to_metadata.output.metadata_json,
metadata_json=rules.copy_blaze_metadata.output.metadata_json,
params:
max_downsampling_layers=5,
temp_h5=str(
Expand Down
15 changes: 15 additions & 0 deletions workflow/rules/common.smk
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ def get_bids_toplevel_targets():
return targets


def dataset_is_remote(wildcards):
return is_remote_gcs(Path(get_dataset_path(wildcards)))


def get_input_dataset(wildcards):
"""returns path to extracted dataset or path to provided input folder"""
dataset_path = Path(get_dataset_path(wildcards))
Expand All @@ -148,6 +152,17 @@ def get_input_dataset(wildcards):
print(f"unsupported input: {dataset_path}")


def get_metadata_json(wildcards):
"""returns path to metadata, extracted from local or gcs"""
dataset_path = Path(get_dataset_path(wildcards))
suffix = dataset_path.suffix

if is_remote_gcs(dataset_path):
return rules.blaze_to_metadata_gcs.output.metadata_json.format(**wildcards)
else:
return rules.blaze_to_metadata.output.metadata_json.format(**wildcards)


# import
def cmd_extract_dataset(wildcards, input, output):
cmds = []
Expand Down
12 changes: 6 additions & 6 deletions workflow/rules/flatfield_corr.smk
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
rule fit_basic_flatfield_corr:
""" BaSiC flatfield correction"""
input:
zarr=bids(
zarr=lambda wildcards: bids(
root=work,
subject="{subject}",
datatype="micr",
sample="{sample}",
acq="{acq}",
desc="raw",
desc="rawfromgcs" if dataset_is_remote(wildcards) else "raw",
suffix="SPIM.zarr",
),
).format(**wildcards),
params:
channel=lambda wildcards: get_stains(wildcards).index(wildcards.stain),
max_n_images=config["basic_flatfield_corr"]["max_n_images"],
Expand Down Expand Up @@ -64,15 +64,15 @@ rule fit_basic_flatfield_corr:
rule apply_basic_flatfield_corr:
""" apply BaSiC flatfield correction """
input:
zarr=bids(
zarr=lambda wildcards: bids(
root=work,
subject="{subject}",
datatype="micr",
sample="{sample}",
acq="{acq}",
desc="raw",
desc="rawfromgcs" if dataset_is_remote(wildcards) else "raw",
suffix="SPIM.zarr",
),
).format(**wildcards),
model_dirs=lambda wildcards: expand(
rules.fit_basic_flatfield_corr.output.model_dir,
stain=get_stains(wildcards),
Expand Down
148 changes: 117 additions & 31 deletions workflow/rules/import.smk
Original file line number Diff line number Diff line change
Expand Up @@ -33,42 +33,47 @@ rule extract_dataset:
"{params.cmd}"


rule cp_from_gcs:
rule blaze_to_metadata_gcs:
input:
creds=os.path.expanduser(config["remote_creds"]),
params:
dataset_path=get_dataset_path_gs,
in_tif_pattern=lambda wildcards: config["import_blaze"]["raw_tif_pattern"],
storage_provider_settings=workflow.storage_provider_settings,
output:
ome_dir=temp(
directory(
bids(
root=work,
subject="{subject}",
datatype="micr",
sample="{sample}",
acq="{acq}",
desc="rawfromgcs",
suffix="SPIM",
)
)
metadata_json=bids(
root=root,
desc="gcs",
subject="{subject}",
datatype="micr",
sample="{sample}",
acq="{acq,[a-zA-Z0-9]*blaze[a-zA-Z0-9]*}",
suffix="SPIM.json",
),
threads: config["cores_per_rule"]
group:
"preproc"
benchmark:
bids(
root="benchmarks",
datatype="blaze_to_metadata_gcs",
subject="{subject}",
sample="{sample}",
acq="{acq}",
suffix="benchmark.tsv",
)
log:
bids(
root="logs",
datatype="blaze_to_metadata_gcs",
subject="{subject}",
datatype="cp_from_gcs",
sample="{sample}",
acq="{acq}",
desc="raw",
suffix="log.txt",
),
group:
"preproc"
container:
None
conda:
"../envs/google_cloud.yaml"
shell:
"mkdir -p {output} && gcloud storage cp --recursive {params.dataset_path}/* {output}"
config["containers"]["spimprep"]
script:
"../scripts/blaze_to_metadata_gcs.py"


rule blaze_to_metadata:
Expand All @@ -80,13 +85,16 @@ rule blaze_to_metadata:
config["import_blaze"]["raw_tif_pattern"],
),
output:
metadata_json=bids(
root=root,
subject="{subject}",
datatype="micr",
sample="{sample}",
acq="{acq,[a-zA-Z0-9]*blaze[a-zA-Z0-9]*}",
suffix="SPIM.json",
metadata_json=temp(
bids(
root=work,
subject="{subject}",
desc="local",
datatype="micr",
sample="{sample}",
acq="{acq,[a-zA-Z0-9]*blaze[a-zA-Z0-9]*}",
suffix="SPIM.json",
)
),
benchmark:
bids(
Expand Down Expand Up @@ -114,6 +122,31 @@ rule blaze_to_metadata:
"../scripts/blaze_to_metadata.py"


rule copy_blaze_metadata:
input:
json=get_metadata_json,
output:
metadata_json=bids(
root=root,
subject="{subject}",
datatype="micr",
sample="{sample}",
acq="{acq,[a-zA-Z0-9]*blaze[a-zA-Z0-9]*}",
suffix="SPIM.json",
),
log:
bids(
root="logs",
datatype="copy_blaze_metadata",
subject="{subject}",
sample="{sample}",
acq="{acq}",
suffix="log.txt",
),
shell:
"cp {input} {output} &> {log}"


rule prestitched_to_metadata:
input:
ome_dir=get_input_dataset,
Expand Down Expand Up @@ -162,7 +195,7 @@ rule tif_to_zarr:
images as the chunks"""
input:
ome_dir=get_input_dataset,
metadata_json=rules.blaze_to_metadata.output.metadata_json,
metadata_json=rules.copy_blaze_metadata.output.metadata_json,
params:
in_tif_pattern=lambda wildcards, input: os.path.join(
input.ome_dir,
Expand Down Expand Up @@ -208,3 +241,56 @@ rule tif_to_zarr:
config["containers"]["spimprep"]
script:
"../scripts/tif_to_zarr.py"


rule tif_to_zarr_gcs:
""" use dask to load tifs in parallel and write to zarr
output shape is (tiles,channels,z,y,x), with the 2d
images as the chunks"""
input:
metadata_json=rules.copy_blaze_metadata.output.metadata_json,
creds=os.path.expanduser(config["remote_creds"]),
params:
dataset_path=get_dataset_path_gs,
in_tif_pattern=lambda wildcards: config["import_blaze"]["raw_tif_pattern"],
intensity_rescaling=config["import_blaze"]["intensity_rescaling"],
storage_provider_settings=workflow.storage_provider_settings,
output:
zarr=temp(
directory(
bids(
root=work,
subject="{subject}",
datatype="micr",
sample="{sample}",
acq="{acq}",
desc="rawfromgcs",
suffix="SPIM.zarr",
)
)
),
benchmark:
bids(
root="benchmarks",
datatype="tif_to_zarr",
subject="{subject}",
sample="{sample}",
acq="{acq}",
suffix="benchmark.tsv",
)
log:
bids(
root="logs",
datatype="tif_to_zarr",
subject="{subject}",
sample="{sample}",
acq="{acq}",
suffix="log.txt",
),
group:
"preproc"
threads: config["cores_per_rule"]
container:
config["containers"]["spimprep"]
script:
"../scripts/tif_to_zarr_gcs.py"
2 changes: 1 addition & 1 deletion workflow/rules/ome_zarr.smk
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ rule zarr_to_ome_zarr:
desc=config["ome_zarr"]["desc"],
allow_missing=True,
),
metadata_json=rules.blaze_to_metadata.output.metadata_json,
metadata_json=rules.copy_blaze_metadata.output.metadata_json,
params:
max_downsampling_layers=config["ome_zarr"]["max_downsampling_layers"],
rechunk_size=config["ome_zarr"]["rechunk_size"],
Expand Down
6 changes: 3 additions & 3 deletions workflow/rules/qc.smk
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
rule generate_flatfield_qc:
"Generates an html file for comparing before and after flatfield correction"
input:
uncorr=bids(
uncorr=lambda wildcards: bids(
root=work,
subject="{subject}",
datatype="micr",
sample="{sample}",
acq="{acq}",
desc="raw",
desc="rawfromgcs" if dataset_is_remote(wildcards) else "raw",
suffix="SPIM.zarr",
),
).format(**wildcards),
corr=bids(
root=work,
subject="{subject}",
Expand Down
Loading

0 comments on commit 407827c

Please sign in to comment.