Skip to content

Commit

Permalink
feat(analysis): add initial version (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Nov 13, 2024
1 parent 19d69d5 commit 9a07769
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 0 deletions.
128 changes: 128 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,131 @@
[![image](https://img.shields.io/badge/discourse-forum-blue.svg)](https://forum.reana.io)
[![image](https://img.shields.io/github/license/reanahub/reana-demo-dask-coffea.svg)](https://github.com/reanahub/reana-demo-dask-coffea/blob/master/LICENSE)
[![image](https://www.reana.io/static/img/badges/launch-on-reana-at-cern.svg)](https://reana.cern.ch/launch?url=https%3A%2F%2Fgithub.com%2Freanahub%2Freana-demo-dask-coffea&specification=reana.yaml&name=reana-demo-dask-coffea)

## About

This [REANA](http://www.reana.io/) reproducible analysis example provides a
simple example how to run Dask workflows using Coffea. The example was adapted
from
[Coffea Casa tutorials](https://github.com/CoffeaTeam/coffea-casa-tutorials/blob/master/examples/example1.ipynb)
repository.

## Analysis structure

Making a research data analysis reproducible basically means to provide
"runnable recipes" addressing (1) where is the input data, (2) what software was
used to analyse the data, (3) which computing environments were used to run the
software and (4) which computational workflow steps were taken to run the
analysis. This will permit to instantiate the analysis on the computational
cloud and run the analysis to obtain (5) output results.

### 1. Input data

In this example, we are using a single CMS open data set file
`Run2012B_SingleMu.root` which is hosted at EOSPUBLIC XRootD server.

### 2. Analysis code

The analysis code consists of a single Python file called `analysis.py` which
connects to a Dask cluster and then conducts the analysis and prints MET
histogram.

### 3. Compute environment

In order to be able to rerun the analysis even several years in the future, we
need to "encapsulate the current compute environment". We shall achieve this by
preparing a [Docker](https://www.docker.com/) container image for our analysis
steps.

This example makes use of the Coffea platform image with the specific version
0.7.22. The container image can be found on Docker Hub at
[docker.io/coffeateam/coffea-dask-cc7:0.7.22-py3.10-g7f049](https://hub.docker.com/r/coffeateam/coffea-dask-cc7).

### 4. Analysis workflow

The analysis workflow is simple and consists of a single command. We simply run
the script `python analysis.py` to run the example. The command will then use
the Dask behind the scenes to possibly launch parallel computations. As a user,
we do not have to specify the computational graph ourselves; the Dask library
will take care of dispatching computations.

### 5. Output results

The example produces the following MET event-level histogram as an output.

![](https://github.com/user-attachments/assets/e52c2391-626d-4556-90ca-75248516cc95)

## Running the example on REANA cloud

There are two ways to execute this analysis example on REANA.

If you would like to simply launch this analysis example on the REANA instance
at CERN and inspect its results using the web interface, please click on the
following badge:

[![Launch on REANA@CERN badge](https://www.reana.io/static/img/badges/launch-on-reana-at-cern.svg)](https://reana.cern.ch/launch?url=https://github.com/reanahub/reana-demo-dask-coffea&specification=reana.yaml&name=reana-demo-dask-coffea)

If you would like a step-by-step guide on how to use the REANA command-line
client to launch this analysis example, please read on.

We start by creating a [reana.yaml](reana.yaml) file describing the above
analysis structure with its inputs, code, runtime environment, computational
workflow steps and expected outputs:

```yaml
inputs:
files:
- analysis.py
workflow:
type: serial
resources:
dask:
image: docker.io/coffeateam/coffea-dask-cc7:0.7.22-py3.10-g7f049
specification:
steps:
- name: process
environment: docker.io/coffeateam/coffea-dask-cc7:0.7.22-py3.10-g7f049
commands:
- python analysis.py
outputs:
files:
- histogram.png
tests:
files:
- tests/log-messages.feature
- tests/workspace-files.feature
```
In this example we are using a simple Serial workflow engine to launch our
Dask-based computations.
We can now install the REANA command-line client, run the analysis and download
the resulting plots:
```console
$ # create new virtual environment
$ virtualenv ~/.virtualenvs/reana
$ source ~/.virtualenvs/reana/bin/activate
$ # install REANA client
$ pip install reana-client
$ # connect to some REANA cloud instance
$ export REANA_SERVER_URL=https://reana.cern.ch/
$ export REANA_ACCESS_TOKEN=XXXXXXX
$ # create new workflow
$ reana-client create -n myanalysis
$ export REANA_WORKON=myanalysis
$ # upload input code, data and workflow to the workspace
$ reana-client upload
$ # start computational workflow
$ reana-client start
$ # ... should be finished in about 5 minutes
$ reana-client status
$ # list workspace files
$ reana-client ls
$ # download output results
$ reana-client download
```

Please see the [REANA-Client](https://reana-client.readthedocs.io/)
documentation for more detailed explanation of typical `reana-client` usage
scenarios.
76 changes: 76 additions & 0 deletions analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import numpy as np
import hist
import coffea.processor as processor
import awkward as ak
from coffea.nanoevents import schemas
import matplotlib.pyplot as plt
from dask.distributed import Client
import os


# This program plots an event-level variable (in this case, MET, but switching it is as easy as a dict-key change). It also demonstrates an easy use of the book-keeping cutflow tool, to keep track of the number of events processed.


# The processor class bundles our data analysis together while giving us some helpful tools. It also leaves looping and chunks to the framework instead of us.
class Processor(processor.ProcessorABC):
def __init__(self):
# Bins and categories for the histogram are defined here. For format, see https://coffeateam.github.io/coffea/stubs/coffea.hist.hist_tools.Hist.html && https://coffeateam.github.io/coffea/stubs/coffea.hist.hist_tools.Bin.html
dataset_axis = hist.axis.StrCategory(
name="dataset", label="", categories=[], growth=True
)
MET_axis = hist.axis.Regular(
name="MET", label="MET [GeV]", bins=50, start=0, stop=100
)

# The accumulator keeps our data chunks together for histogramming. It also gives us cutflow, which can be used to keep track of data.
self.output = processor.dict_accumulator(
{
"MET": hist.Hist(dataset_axis, MET_axis),
"cutflow": processor.defaultdict_accumulator(int),
}
)

def process(self, events):
# This is where we do our actual analysis. The dataset has columns similar to the TTree's; events.columns can tell you them, or events.[object].columns for deeper depth.
dataset = events.metadata["dataset"]
MET = events.MET.pt

# We can define a new key for cutflow (in this case 'all events'). Then we can put values into it. We need += because it's per-chunk (demonstrated below)
self.output["cutflow"]["all events"] += ak.size(MET)
self.output["cutflow"]["number of chunks"] += 1

# This fills our histogram once our data is collected. The hist key ('MET=') will be defined in the bin in __init__.
self.output["MET"].fill(dataset=dataset, MET=MET)
return self.output

def postprocess(self, accumulator):
pass


DASK_SCHEDULER_URI = os.getenv("DASK_SCHEDULER_URI", "tcp://127.0.0.1:8080")
client = Client(DASK_SCHEDULER_URI)

fileset = {
"SingleMu": [
"root://eospublic.cern.ch//eos/root-eos/benchmark/Run2012B_SingleMu.root"
]
}

executor = processor.DaskExecutor(client=client)

run = processor.Runner(
executor=executor, schema=schemas.NanoAODSchema, savemetrics=True
)

output, metrics = run(fileset, "Events", processor_instance=Processor())

# Generates a 1D histogram from the data output to the 'MET' key. fill_opts are optional, to fill the graph (default is a line).
output["MET"].plot1d()


# Easy way to print all cutflow dict values. Can just do print(output['cutflow']["KEY_NAME"]) for one.
for key, value in output["cutflow"].items():
print(key, value)

# Save the histogram plot to a file (e.g., 'histogram.png')
plt.savefig("histogram.png")
21 changes: 21 additions & 0 deletions reana.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
inputs:
files:
- analysis.py
workflow:
type: serial
resources:
dask:
image: docker.io/coffeateam/coffea-dask-cc7:0.7.22-py3.10-g7f049
specification:
steps:
- name: process
environment: docker.io/coffeateam/coffea-dask-cc7:0.7.22-py3.10-g7f049
commands:
- python analysis.py
outputs:
files:
- histogram.png
tests:
files:
- tests/log-messages.feature
- tests/workspace-files.feature
15 changes: 15 additions & 0 deletions tests/log-messages.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Tests for the expected workflow log messages

Feature: Log messages

As a researcher,
I want to be able to see the log messages of my workflow execution,
So that I can verify that the workflow ran correctly.

Scenario: The workflow start has produced the expected messages
When the workflow is finished
Then the job logs for the "process" step should contain
"""
all events 53446198
number of chunks 534
"""
17 changes: 17 additions & 0 deletions tests/workspace-files.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Tests for the presence of the expected workflow files

Feature: Workspace files

As a researcher,
I want to make sure that my workflow produces expected files,
so that I can be sure that the workflow outputs are correct.

Scenario: The workspace contains the expected input files
When the workflow is finished
Then the workspace should include "analysis.py"

Scenario: The workflow generates the final plot
When the workflow is finished
Then the workspace should contain "histogram.png"
And the sha256 checksum of the file "histogram.png" should be "c8f87114530c049d587f355cef07280fa5d760910c32638136a713eab1aa72e1"
And all the outputs should be included in the workspace

0 comments on commit 9a07769

Please sign in to comment.