Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test that the container running on the UChicago AF JupyterHub is able to control worker nodes through Dask #4

Open
matthewfeickert opened this issue Nov 8, 2023 · 45 comments
Assignees

Comments

@matthewfeickert
Copy link
Member

We want to be able to run a workflow that runs a distributed analysis with dask.distributed. This means that a

from dask.distributed import Client
client = Client()

run inside of a Jupyter Notebook in the interactive JupyterLab session should be able to send jobs to the worker nodes on the AF.

@lukasheinrich
Copy link

lukasheinrich commented Nov 10, 2023

Just pasting here some of the next steps we should be targeting

@matthewfeickert
Copy link
Member Author

matthewfeickert commented Nov 10, 2023

For tracking purposes, @mvigl can you also comment here what the data sets are that are needed for https://gitlab.cern.ch/gstark/pycolumnarprototype (not sure what branch is being used for dev at the moment) to run the notebook demos and where those are located on the MWT2 at the moment?

@matthewfeickert
Copy link
Member Author

compile @mvigl's code in standalone mode (i.e. what he's been working on for a while)

So right away we run into some permissions issues with Git in the container (will have to think of the best way to sort that out)

(venv) [bash][feickert AnalysisBase-24.2.26]:workarea > . /release_setup.sh
(venv) [bash][feickert AnalysisBase-24.2.26]:workarea > mkdir container_testing
(venv) [bash][feickert AnalysisBase-24.2.26]:workarea > cd container_testing/
(venv) [bash][feickert AnalysisBase-24.2.26]:container_testing > git clone --recursive ssh://[email protected]:7999/gstark/pycolumnarprototype.git
Cloning into 'pycolumnarprototype'...
Warning: Permanently added the ECDSA host key for IP address '[188.185.35.37]:7999' to the list of known hosts.
Permission denied (publickey,keyboard-interactive).
fatal: Could not read from remote repository.

Please make sure you have the correct access rights
and the repository exists.
(venv) [bash][feickert AnalysisBase-24.2.26]:container_testing > git clone -v --recursive https://gitlab.cern.ch/gstark/pycolumnarprototype.git
Cloning into 'pycolumnarprototype'...
# This hangs forever

But if we avoid that for the moment and do from a login node on the UChicago AF

$ ssh [email protected]
[16:19] login02.af.uchicago.edu:~ $ cd workarea/container_testing/
[16:20] login02.af.uchicago.edu:~/workarea/container_testing $ git clone --branch py_el_tool_test --recursive ssh://[email protected]:7999/gstark/pycolumnarprototype.git

and then flip back to JupyterLab we're able to build as expected

(venv) [bash][feickert AnalysisBase-24.2.26]:pycolumnarprototype > cmake -S src -B build
(venv) [bash][feickert AnalysisBase-24.2.26]:pycolumnarprototype > cmake --build build --clean-first
-- Setting ATLAS specific build flags
-- checker_gccplugins library not found
-- Using the LCG modules without setting up a release
-- $<BUILD_INTERFACE:/usr/AnalysisBaseExternals/24.2.26/InstallArea/x86_64-centos7-gcc11-opt/lib/libpython3.9.so>;$<INSTALL_INTERFACE:/usr/AnalysisBaseExternals/24.2.26/InstallArea/x86_64-centos7-gcc11-opt/lib/libpython3.9.so>
-- Configuring ATLAS project with name "PyColumnarPrototypeDemo" and version "1.0.0"
...
[100%] Linking CXX shared library ../x86_64-centos7-gcc11-opt/lib/libColumnarPrototypeDict.so
Detaching debug info of libColumnarPrototypeDict.so into libColumnarPrototypeDict.so.dbg
[100%] Built target ColumnarPrototypeDict
[100%] Built target Package_ColumnarPrototype
(venv) [bash][feickert AnalysisBase-24.2.26]:pycolumnarprototype > PYTHONPATH="$(dirname $(find . -type f -iname "PyColumnarPrototype*.so")):${PYTHONPATH}" python3 -c 'import PyColumnarPrototype; print(f"{PyColumnarPrototype.column_maker()=}")'
PyColumnarPrototype.column_maker()=array([1543080.6, 7524391.5], dtype=float32)

I'll count that as "good enough" until we can figure out how to manage Git permissions here, and I think the SSL team can give us plenty of pointers given that they know how to do this for Coffea-casa. 👍

@mvigl
Copy link

mvigl commented Nov 10, 2023

For tracking purposes, @mvigl can you also comment here what the data sets are that are needed for https://gitlab.cern.ch/gstark/pycolumnarprototype (not sure what branch is being used for dev at the moment) to run the notebook demos and where those are located on the MWT2 at the moment?

The dev branch is https://gitlab.cern.ch/gstark/pycolumnarprototype/-/tree/py_el_tool_test?ref_type=heads, the PHYSLITE data in Zee_demo.ipynb is quite old and I believe it can only be found here now:

  • /afs/cern.ch/user/e/ekourlit/public/mc20_13TeV.700322.Sh_2211_Zee_maxHTpTV2_CVetoBVeto.deriv.DAOD_PHYSLITE.e8351_s3681_r13167_p5511/mc20_13TeV/DAOD_PHYSLITE.32059997._000049.pool.root.1
  • /eos/user/e/ekourlit/public/data17_13TeV.periodAllYear.physics_Main.PhysCont.DAOD_PHYSLITE.grp17_v01_p5631/DAOD_PHYSLITE.29681342._000190.pool.root.1

As for the datasets we will move to I'll paste here Vangelis' comment from another thread:

So the MC dataset for the phase 1 testing can be the: mc20_13TeV.700322.Sh_2211_Zee_maxHTpTV2_CVetoBVeto.deriv.DAOD_PHYSLITE.e8351_s3681_r13167_p5855
which is ~1 TB.

For the phase 2 testing we would need both data and MC:

  • data18_13TeV.periodAllYear.physics_Main.PhysCont.DAOD_PHYSLITE.grp18_v01_p5855
  • mc20_13TeV.700320.Sh_2211_Zee_maxHTpTV2_BFilter.deriv.DAOD_PHYSLITE.e8351_s3681_r13167_p5855
  • mc20_13TeV.700321.Sh_2211_Zee_maxHTpTV2_CFilterBVeto.deriv.DAOD_PHYSLITE.e8351_s3681_r13167_p5855
    which is ~45 TB (data) + ~1.5 TB (MC)

@matthewfeickert
Copy link
Member Author

matthewfeickert commented Nov 13, 2023

run Nikolai's notebook in local dask worker mode

This is now done via https://gitlab.cern.ch/gstark/pycolumnarprototype/-/issues/1#note_7305996. I'll let @mvigl do a double check on this but I'm calling this complete as it ran from top top bottom with a "restart and run all" in the container on the UChicago AF.

@ivukotic
Copy link
Contributor

I have already made sure that data can be found at MWT2_LOCALGROUPDISK and MC partly at MWT2 and all of it at BNL.

@matthewfeickert
Copy link
Member Author

matthewfeickert commented Nov 13, 2023

try to setup a dask cluster with 2 nodes or similar & repeat

Okay, so @ivukotic has been kind enough to agree to work on setting up a kubernets cluster for us this week where he'll do some tests and see if he can get the notebook to scale out across all the data that he's transfered to the MWT2_LOCALGROUPDISK. 🚀 He'll comment back here if he has questions for us, but once he's done that then we should be able to try this out genrically and do some scaling tests on the cluster.

So this might also address

possibly to a full scale test (e.g. entire dataset)

We can then take these results to ATLAS and further motivate

look into integrating pybind/nanobind into athena & adjust cmake

👍

@matthewfeickert
Copy link
Member Author

matthewfeickert commented Nov 14, 2023

Recursive cloning without authentication

git clone --branch py_el_tool_test --recurse-submodules https://gitlab.cern.ch/gstark/pycolumnarprototype.git

has now been fixed.

Example:
(venv) [bash][atlas AnalysisBase-24.2.26]:workdir > git clone --branch py_el_tool_test --recurse-submodules https://gitlab.cern.ch/gstark/pycolumnarprototype.git
Cloning into 'pycolumnarprototype'...
remote: Enumerating objects: 515, done.
remote: Counting objects: 100% (152/152), done.
remote: Compressing objects: 100% (146/146), done.
remote: Total 515 (delta 67), reused 16 (delta 6), pack-reused 363
Receiving objects: 100% (515/515), 2.26 MiB | 1.67 MiB/s, done.
Resolving deltas: 100% (283/283), done.
Submodule 'src/columnarprototype' (https://gitlab.cern.ch/krumnack/columnarprototype.git) registered for path 'src/ColumnarPrototype'
Submodule 'src/nanobind' (https://github.com/wjakob/nanobind) registered for path 'src/nanobind'
Cloning into 'src/ColumnarPrototype'...
remote: Enumerating objects: 370, done.
remote: Counting objects: 100% (308/308), done.
remote: Compressing objects: 100% (109/109), done.
remote: Total 370 (delta 199), reused 307 (delta 199), pack-reused 62
Receiving objects: 100% (370/370), 104.55 KiB | 0 bytes/s, done.
Resolving deltas: 100% (229/229), done.
Submodule path 'src/ColumnarPrototype': checked out '1e1537fc9669fe7425e74313200dd8bd3e4e3c64'
Cloning into 'src/nanobind'...
remote: Enumerating objects: 5502, done.
remote: Counting objects: 100% (1855/1855), done.
remote: Compressing objects: 100% (268/268), done.
remote: Total 5502 (delta 1646), reused 1681 (delta 1568), pack-reused 3647
Receiving objects: 100% (5502/5502), 2.00 MiB | 0 bytes/s, done.
Resolving deltas: 100% (3981/3981), done.
Submodule path 'src/nanobind': checked out 'c7bd406ef758c933eaf4b2d03d6d81b54bd9ad03'
Submodule 'ext/robin_map' (https://github.com/Tessil/robin-map) registered for path 'ext/robin_map'
Cloning into 'ext/robin_map'...
remote: Enumerating objects: 1098, done.
remote: Counting objects: 100% (152/152), done.
remote: Compressing objects: 100% (57/57), done.
remote: Total 1098 (delta 105), reused 115 (delta 82), pack-reused 946
Receiving objects: 100% (1098/1098), 875.43 KiB | 0 bytes/s, done.
Resolving deltas: 100% (752/752), done.
Submodule path 'src/nanobind/ext/robin_map': checked out '68ff7325b3898fca267a103bad5c509e8861144d'
(venv) [bash][atlas AnalysisBase-24.2.26]:workdir > 

c.f. https://gitlab.cern.ch/gstark/pycolumnarprototype/-/issues/2#note_7306579 for more details.

Using this I've added PyColumnarPrototype to the container image in PR #11 so that it is importable. If a user wants to build ColumnarPrototype from https://gitlab.cern.ch/gstark/pycolumnarprototype/ they still can use that by either patching PYTHONPATH

PYTHONPATH="$(dirname $(find . -type f -iname "PyColumnarPrototype*.so")):${PYTHONPATH}"

or sys.path

# Ensure importable
try:
    import PyColumnarPrototype
except ModuleNotFoundError as err:
    import sys
    from pathlib import Path
    # position 1 of sys.path will be after cwd and before the activated virtual environment's site-packages
    sys.path.insert(1, str(next(Path().cwd().glob("**/PyColumnarPrototype*.so")).parent))
    import PyColumnarPrototype

with the built version at import time.

@ivukotic
Copy link
Contributor

@mvigl can you please let me know how do I change Z->ee notebook in order to run across more than 1 mc and 1 data file?

@mvigl
Copy link

mvigl commented Nov 14, 2023

@mvigl can you please let me know how do I change Z->ee notebook in order to run across more than 1 mc and 1 data file?

I don't have a clear answer yet, sry - I need to look more in detail tomorrow. Changing the 'Get data' section to something like this could be a starting point.

MC_Files = "/data/alheld/ATLAS/mc20_13TeV/DAOD_PHYSLITE.*.pool.root.1:CollectionTree"
variables = ["AnalysisElectronsAuxDyn.pt",
             "AnalysisElectronsAuxDyn.eta",
             "AnalysisElectronsAuxDyn.phi",
             "AnalysisElectronsAuxDyn.charge"
            ]
events = uproot.dask(MC_Files, 
                     library="ak", 
                     filter_name=variables,
                     open_files=False)

We don't want to specify the variables that are accessed (which is a longer list) but for some reason it doesn't work if I don't.

UnknownInterpretation: none of the rules matched
in file [/eos/user/e/ekourlit/public/mc20_13TeV/DAOD_PHYSLITE.34869306._000049.pool.root.1](https://vscode-remote+ssh-002dremote-002blxplus-002ecern-002ech.vscode-resource.vscode-cdn.net/eos/user/e/ekourlit/public/mc20_13TeV/DAOD_PHYSLITE.34869306._000049.pool.root.1)
in object [/CollectionTree](https://vscode-remote+ssh-002dremote-002blxplus-002ecern-002ech.vscode-resource.vscode-cdn.net/CollectionTree);1:xTrigDecisionAux./xTrigDecisionAux.xAOD::AuxInfoBase

@ivukotic
Copy link
Contributor

@mvigl can you please let me know how do I change Z->ee notebook in order to run across more than 1 mc and 1 data file?

I don't have a clear answer yet, sry - I need to look more in detail tomorrow. Changing the 'Get data' section to something like this could be a starting point.

MC_Files = "/data/alheld/ATLAS/mc20_13TeV/DAOD_PHYSLITE.*.pool.root.1:CollectionTree"
variables = ["AnalysisElectronsAuxDyn.pt",
             "AnalysisElectronsAuxDyn.eta",
             "AnalysisElectronsAuxDyn.phi",
             "AnalysisElectronsAuxDyn.charge"
            ]
events = uproot.dask(MC_Files, 
                     library="ak", 
                     filter_name=variables,
                     open_files=False)

We don't want to specify the variables that are accessed (which is a longer list) but for some reason it doesn't work if I don't.

UnknownInterpretation: none of the rules matched
in file [/eos/user/e/ekourlit/public/mc20_13TeV/DAOD_PHYSLITE.34869306._000049.pool.root.1](https://vscode-remote+ssh-002dremote-002blxplus-002ecern-002ech.vscode-resource.vscode-cdn.net/eos/user/e/ekourlit/public/mc20_13TeV/DAOD_PHYSLITE.34869306._000049.pool.root.1)
in object [/CollectionTree](https://vscode-remote+ssh-002dremote-002blxplus-002ecern-002ech.vscode-resource.vscode-cdn.net/CollectionTree);1:xTrigDecisionAux./xTrigDecisionAux.xAOD::AuxInfoBase

it is not realistic to have 50TB locally. I need a way to give it a lot of files accessible via xroot...

@ekourlit
Copy link

I think that should be doable according to documentation: https://uproot.readthedocs.io/en/latest/uproot._dask.dask.html

@matthewfeickert
Copy link
Member Author

@alexander-held points out that for the accessing local files you can use a list of xrootd-accessible URIs like is done in the IRIS-HEP Analysis Grand Challenge notebooks. Probably the most relevant bit is the utils.file_input.construct_fileset.

@matthewfeickert
Copy link
Member Author

The issue of being able to open and read files over xrootd with uproot (c.f. scikit-hep/uproot5#1038) has been temporarily dealt with in PR #21. (waiting now for uproot v5.2.0 to finalize)

@matthewfeickert
Copy link
Member Author

matthewfeickert commented Nov 22, 2023

We don't want to specify the variables that are accessed (which is a longer list) but for some reason it doesn't work if I don't.

@mvigl @ivukotic The variables are required to be specified as not all of the tree branches in the xAOD can be converted to arrays using Awkward (c.f. scikit-hep/uproot5#1040 (comment)).

So this means that you'd need something like the following modified example that @ivukotic gave (also from scikit-hep/uproot5#1040 (comment)):

import uproot

xc = "root://xcache.af.uchicago.edu:1094//"
fname_data = (
    xc
    + "root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/data18_13TeV/df/a4/DAOD_PHYSLITE.34858087._000001.pool.root.1"
)
fname_dat1 = (
    xc
    + "root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/data18_13TeV/6c/67/DAOD_PHYSLITE.34858087._000002.pool.root.1"
)

tree_name = "CollectionTree"
branches = [
    "AnalysisElectronsAuxDyn.pt",
    "AnalysisElectronsAuxDyn.eta",
    "AnalysisElectronsAuxDyn.phi",
    "AnalysisElectronsAuxDyn.charge",
]

tree_data = uproot.iterate(
    {fname_data: tree_name, fname_dat1: tree_name},
    expressions=branches,
    step_size=1000,  # step_size here just as a throwaway example
)
print(next(tree_data))  # example to show the iterator works

which you would then generalize further with different uproot.iterate files parameter choices.

@alexander-held
Copy link

The variables are required to be specified as not all of the tree branches in the xAOD can be converted to arrays using Awkward

I think (hope) that we would not need to say ahead of time what we need: in principle that info is in the graph (determined by what we want to ultimately obtain) and ideally uproot should only try to access what is strictly required?

Is the issue that we cannot read a PHYSLITE file at all because somehow uproot trips over some branches, even if they need to not be read technically?

@matthewfeickert
Copy link
Member Author

Is the issue that we cannot read a PHYSLITE file at all because somehow uproot trips over some branches, even if they need to not be read technically?

It appears yes, as uproot.iterate's API notes that with the default of None

uproot.behaviors.TBranch.iterate(files, expressions=None, ...)

it reads the whole tree

expressions (None, str, or list of str) – Names of TBranches or aliases to convert to arrays or mathematical expressions of them. Uses the language to evaluate. If None, all TBranches selected by the filters are included.

as revising @ivukotic's original example and taking some of the points I demonstrated in scikit-hep/uproot5#1040 (comment) we can get

# test.py
import uproot

xc = "root://xcache.af.uchicago.edu:1094//"
fname_data = (
    xc
    + "root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/data18_13TeV/df/a4/DAOD_PHYSLITE.34858087._000001.pool.root.1"
)
fname_dat1 = (
    xc
    + "root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/data18_13TeV/6c/67/DAOD_PHYSLITE.34858087._000002.pool.root.1"
)

tree_data = uproot.iterate({fname_data: "CollectionTree", fname_dat1: "CollectionTree"})
next(tree_data)  # trigger error

and by poking at things interactively with python -i after we hit errors

(venv) [bash][atlas AnalysisBase-24.2.26]:analysis > python -i test.py 
Traceback (most recent call last):
  File "/venv/lib/python3.9/site-packages/uproot/behaviors/TBranch.py", line 2478, in _awkward_check
    interpretation.awkward_form(self.file)
  File "/venv/lib/python3.9/site-packages/uproot/interpretation/objects.py", line 111, in awkward_form
    return self._model.awkward_form(self._branch.file, context)
  File "/venv/lib/python3.9/site-packages/uproot/model.py", line 684, in awkward_form
    raise uproot.interpretation.objects.CannotBeAwkward(
uproot.interpretation.objects.CannotBeAwkward: xAOD::MissingETAssociationMap_v1

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/analysis/test.py", line 14, in <module>
    next(tree_data)  # trigger error
  File "/venv/lib/python3.9/site-packages/uproot/behaviors/TBranch.py", line 191, in iterate
    for item in hasbranches.iterate(
  File "/venv/lib/python3.9/site-packages/uproot/behaviors/TBranch.py", line 1076, in iterate
    _ranges_or_baskets_to_arrays(
  File "/venv/lib/python3.9/site-packages/uproot/behaviors/TBranch.py", line 3041, in _ranges_or_baskets_to_arrays
    branchid_to_branch[cache_key]._awkward_check(interpretation)
  File "/venv/lib/python3.9/site-packages/uproot/behaviors/TBranch.py", line 2480, in _awkward_check
    raise ValueError(
ValueError: cannot produce Awkward Arrays for interpretation AsObjects(Unknown_xAOD_3a3a_MissingETAssociationMap_5f_v1) because

    xAOD::MissingETAssociationMap_v1

instead, try library="np" rather than library="ak" or globally set uproot.default_library

in file root://xcache.af.uchicago.edu:1094//root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/data18_13TeV/df/a4/DAOD_PHYSLITE.34858087._000001.pool.root.1
in object /CollectionTree;1:METAssoc_AnalysisMET

we can see that just by trying to get the Awkward array representation of the offending METAssoc_AnalysisMET branch we recover the same error (as expected)

>>> file = uproot.open(fname_data)
>>> tree = file["CollectionTree"]
>>> branch_names = tree.keys()
>>> "METAssoc_AnalysisMET" in branch_names
True
>>> tree["METAssoc_AnalysisMET"]
<TBranchElement 'METAssoc_AnalysisMET' at 0x7ff948c1af40>
>>> tree["METAssoc_AnalysisMET"].array()
Traceback (most recent call last):
  File "/venv/lib/python3.9/site-packages/uproot/behaviors/TBranch.py", line 2478, in _awkward_check
    interpretation.awkward_form(self.file)
  File "/venv/lib/python3.9/site-packages/uproot/interpretation/objects.py", line 111, in awkward_form
    return self._model.awkward_form(self._branch.file, context)
  File "/venv/lib/python3.9/site-packages/uproot/model.py", line 684, in awkward_form
    raise uproot.interpretation.objects.CannotBeAwkward(
uproot.interpretation.objects.CannotBeAwkward: xAOD::MissingETAssociationMap_v1

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/venv/lib/python3.9/site-packages/uproot/behaviors/TBranch.py", line 1811, in array
    _ranges_or_baskets_to_arrays(
  File "/venv/lib/python3.9/site-packages/uproot/behaviors/TBranch.py", line 3041, in _ranges_or_baskets_to_arrays
    branchid_to_branch[cache_key]._awkward_check(interpretation)
  File "/venv/lib/python3.9/site-packages/uproot/behaviors/TBranch.py", line 2480, in _awkward_check
    raise ValueError(
ValueError: cannot produce Awkward Arrays for interpretation AsObjects(Unknown_xAOD_3a3a_MissingETAssociationMap_5f_v1) because

    xAOD::MissingETAssociationMap_v1

instead, try library="np" rather than library="ak" or globally set uproot.default_library

in file root://xcache.af.uchicago.edu:1094//root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/data18_13TeV/df/a4/DAOD_PHYSLITE.34858087._000001.pool.root.1
in object /CollectionTree;1:METAssoc_AnalysisMET
>>> 

So I think we need to go and look at this and then talk with the Awkward team about what can be done here if anything.

@matthewfeickert
Copy link
Member Author

..even with some helpful tips from @lgray

in coffea we pass this: https://github.com/CoffeaTeam/coffea/blob/master/src/coffea/nanoevents/factory.py#L29-L55
into uproot.dask https://github.com/CoffeaTeam/coffea/blob/master/src/coffea/nanoevents/factory.py#L313

import warnings

import uproot


def _remove_not_interpretable(branch):
    if isinstance(
        branch.interpretation, uproot.interpretation.identify.uproot.AsGrouped
    ):
        for name, interpretation in branch.interpretation.subbranches.items():
            if isinstance(
                interpretation, uproot.interpretation.identify.UnknownInterpretation
            ):
                warnings.warn(
                    f"Skipping {branch.name} as it is not interpretable by Uproot"
                )
                return False
    if isinstance(
        branch.interpretation, uproot.interpretation.identify.UnknownInterpretation
    ):
        warnings.warn(f"Skipping {branch.name} as it is not interpretable by Uproot")
        return False

    try:
        _ = branch.interpretation.awkward_form(None)
    except uproot.interpretation.objects.CannotBeAwkward:
        warnings.warn(
            f"Skipping {branch.name} as it is it cannot be represented as an Awkward array"
        )
        return False
    else:
        return True


xc = "root://xcache.af.uchicago.edu:1094//"
fname_data = (
    xc
    + "root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/data18_13TeV/df/a4/DAOD_PHYSLITE.34858087._000001.pool.root.1"
)
fname_dat1 = (
    xc
    + "root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/data18_13TeV/6c/67/DAOD_PHYSLITE.34858087._000002.pool.root.1"
)

tree_data = uproot.iterate(
    {fname_data: "CollectionTree", fname_dat1: "CollectionTree"},
    filter_branch=_remove_not_interpretable,
)
next(tree_data)  # trigger error

we still hit errors

...
Traceback (most recent call last):
  File "/venv/lib/python3.9/site-packages/uproot/interpretation/objects.py", line 742, in basket_array
    output = data.view(dtype).reshape((-1, *shape))
ValueError: When changing to a larger dtype, its size must be a divisor of the total size in bytes of the last axis of the array.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/analysis/test.py", line 49, in <module>
    next(tree_data)  # trigger error
  File "/venv/lib/python3.9/site-packages/uproot/behaviors/TBranch.py", line 191, in iterate
    for item in hasbranches.iterate(
  File "/venv/lib/python3.9/site-packages/uproot/behaviors/TBranch.py", line 1076, in iterate
    _ranges_or_baskets_to_arrays(
  File "/venv/lib/python3.9/site-packages/uproot/behaviors/TBranch.py", line 3139, in _ranges_or_baskets_to_arrays
    uproot.source.futures.delayed_raise(*obj)
  File "/venv/lib/python3.9/site-packages/uproot/source/futures.py", line 38, in delayed_raise
    raise exception_value.with_traceback(traceback)
  File "/venv/lib/python3.9/site-packages/uproot/behaviors/TBranch.py", line 3081, in basket_to_array
    basket_arrays[basket.basket_num] = interpretation.basket_array(
  File "/venv/lib/python3.9/site-packages/uproot/interpretation/jagged.py", line 196, in basket_array
    content = self._content.basket_array(
  File "/venv/lib/python3.9/site-packages/uproot/interpretation/objects.py", line 745, in basket_array
    raise ValueError(
ValueError: basket 0 in tree/branch /CollectionTree;1:METAssoc_AnalysisMETAux./METAssoc_AnalysisMETAux.jetLink has the wrong number of bytes (25086) for interpretation AsStridedObjects(Model_ElementLink_3c_DataVector_3c_xAOD_3a3a_Jet_5f_v1_3e3e__v1)
in file root://xcache.af.uchicago.edu:1094//root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/data18_13TeV/df/a4/DAOD_PHYSLITE.34858087._000001.pool.root.1

@alexander-held
Copy link

alexander-held commented Nov 22, 2023

Trying out _remove_not_interpretable with uproot.dask, I can get this to work:

import uproot

tree = uproot.dask(
    {"DAOD_PHYSLITE.34857549._000351.pool.root.1": "CollectionTree"},
    filter_branch=_remove_not_interpretable
)
delayed_arr = tree["AnalysisElectronsAuxDyn.pt"]
print(delayed_arr.compute())
Full file with XRootD URI:
import warnings

import uproot


def _remove_not_interpretable(branch):
    if isinstance(
        branch.interpretation, uproot.interpretation.identify.uproot.AsGrouped
    ):
        for name, interpretation in branch.interpretation.subbranches.items():
            if isinstance(
                interpretation, uproot.interpretation.identify.UnknownInterpretation
            ):
                warnings.warn(
                    f"Skipping {branch.name} as it is not interpretable by Uproot"
                )
                return False
    if isinstance(
        branch.interpretation, uproot.interpretation.identify.UnknownInterpretation
    ):
        warnings.warn(f"Skipping {branch.name} as it is not interpretable by Uproot")
        return False

    try:
        _ = branch.interpretation.awkward_form(None)
    except uproot.interpretation.objects.CannotBeAwkward:
        warnings.warn(
            f"Skipping {branch.name} as it is it cannot be represented as an Awkward array"
        )
        return False
    else:
        return True


xc = "root://xcache.af.uchicago.edu:1094//"
file_uri = (
    xc
    + "root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/data18_13TeV/df/a4/DAOD_PHYSLITE.34858087._000001.pool.root.1"
)

tree = uproot.dask(
    {file_uri: "CollectionTree"}, filter_branch=_remove_not_interpretable
)
delayed_arr = tree["AnalysisElectronsAuxDyn.pt"]
print(delayed_arr.compute())

Running:

(venv) [bash][atlas AnalysisBase-24.2.26]:analysis >  python test.py
...
[[], [], [], [], [], [], ..., [], [], [7.25e+03], [3.03e+04], [1.49e+04], []]

@matthewfeickert
Copy link
Member Author

@alexander-held Hm. But if we use coffea.nanoevents.NanoEventsFactory we recover the ValueError again:

# example.py
import uproot
from coffea.nanoevents import NanoEventsFactory, PHYSLITESchema

xc = "root://xcache.af.uchicago.edu:1094//"
file_uri = (
    xc
    + "root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/data18_13TeV/df/a4/DAOD_PHYSLITE.34858087._000001.pool.root.1"
)

factory = NanoEventsFactory.from_root(
    {file_uri: "CollectionTree"}, schemaclass=PHYSLITESchema, permit_dask=True
)
events = factory.events()
events.compute()  # ValueError
(venv) [bash][atlas AnalysisBase-24.2.26]:analysis > python example.py
...
/venv/lib/python3.9/site-packages/coffea/nanoevents/factory.py:51: UserWarning: Skipping EventInfoAuxDyn.hardScatterVertexLink as it is not interpretable by Uproot
  warnings.warn(f"Skipping {branch.name} as it is not interpretable by Uproot")
Traceback (most recent call last):
  File "/venv/lib/python3.9/site-packages/uproot/interpretation/objects.py", line 742, in basket_array
    output = data.view(dtype).reshape((-1, *shape))
ValueError: When changing to a larger dtype, its size must be a divisor of the total size in bytes of the last axis of the array.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/analysis/example.py", line 15, in <module>
    events.compute()  # ValueError
  File "/venv/lib/python3.9/site-packages/dask/base.py", line 342, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/venv/lib/python3.9/site-packages/dask/base.py", line 628, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/venv/lib/python3.9/site-packages/uproot/_dask.py", line 1101, in __call__
    return self.read_tree(ttree, start, stop)
  File "/venv/lib/python3.9/site-packages/uproot/_dask.py", line 919, in read_tree
    container[buffer_key] = mapping[buffer_key]
  File "/venv/lib/python3.9/site-packages/coffea/nanoevents/factory.py", line 121, in __getitem__
    return self._mapping[self._func(index)]
  File "/venv/lib/python3.9/site-packages/coffea/nanoevents/mapping/base.py", line 98, in __getitem__
    self.extract_column(
  File "/venv/lib/python3.9/site-packages/coffea/nanoevents/mapping/uproot.py", line 161, in extract_column
    return columnhandle.array(
  File "/venv/lib/python3.9/site-packages/uproot/behaviors/TBranch.py", line 1811, in array
    _ranges_or_baskets_to_arrays(
  File "/venv/lib/python3.9/site-packages/uproot/behaviors/TBranch.py", line 3139, in _ranges_or_baskets_to_arrays
    uproot.source.futures.delayed_raise(*obj)
  File "/venv/lib/python3.9/site-packages/uproot/source/futures.py", line 38, in delayed_raise
    raise exception_value.with_traceback(traceback)
  File "/venv/lib/python3.9/site-packages/uproot/behaviors/TBranch.py", line 3081, in basket_to_array
    basket_arrays[basket.basket_num] = interpretation.basket_array(
  File "/venv/lib/python3.9/site-packages/uproot/interpretation/jagged.py", line 196, in basket_array
    content = self._content.basket_array(
  File "/venv/lib/python3.9/site-packages/uproot/interpretation/objects.py", line 745, in basket_array
    raise ValueError(
ValueError: basket 3 in tree/branch /CollectionTree;1:METAssoc_AnalysisMETAux./METAssoc_AnalysisMETAux.jetLink has the wrong number of bytes (12196) for interpretation AsStridedObjects(Model_ElementLink_3c_DataVector_3c_xAOD_3a3a_Jet_5f_v1_3e3e__v1)
in file root://xcache.af.uchicago.edu:1094//root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/data18_13TeV/df/a4/DAOD_PHYSLITE.34858087._000001.pool.root.1
(venv) [bash][atlas AnalysisBase-24.2.26]:analysis >

@nikoladze
Copy link

nikoladze commented Nov 28, 2023

I think a lot of these errors are related to broken reading of ElementLink branches in awkward forth (scikit-hep/uproot5#951). With the PHYSLITE schema it can also happen that it wants to read an ElementLink branch even if you request something different in case an ElementLink branch appears first in the list and will therefore be used to get the offsets from (therefore you may see errors even when requesting e.g. only Electrons.pt). Regardless of fixing the reading of ElementLinks we could put an optimization in for this such that it would try to avoid getting offsets from ElementLink branches (i think i have some code flying around, but never integrated it into coffea).

@matthewfeickert
Copy link
Member Author

Regardless of fixing the reading of ElementLinks we could put an optimization in for this such that it would try to avoid getting offsets from ElementLink branches (i think i have some code flying around, but never integrated it into coffea).

@nikoladze Can you either share this with us or help contribute this to Coffea? This is blocking for us, and so @alexander-held and I would both like to try to follow up on getting this working (sooner than later).

@alexander-held
Copy link

If we just want to scale Dask + PHYSLITE (and not do much physics with it) I think we can use uproot.dask as in this example https://gist.github.com/alexander-held/56e203690c0d7f67218a4c67d46c586f (from scikit-hep/uproot5#1048), to make the example work just need to (d)ak.flatten() versions of the calls that are currently commented out. The first uproot.dask argument can be a list of dicts for multiple files. We can also presumably wrap everything into four-vector objects and still do things like a Z mass reconstruction, that just requires a bit more boilerplate.

@alexander-held
Copy link

alexander-held commented Nov 28, 2023

Here is a version that just uses uproot.dask, manually builds 4-vector objects, selects di-electron events and plots the Z candidate mass for them:

import dask_awkward as dak
import uproot
import hist.dask
import coffea.nanoevents
import vector
import warnings

warnings.filterwarnings("ignore")
vector.register_awkward()

delayed_hist = hist.dask.Hist.new.Reg(120, 0, 120, label="mass [GeV]").Weight()

tree = uproot.dask(
    [{"DAOD_PHYSLITE.34857549._000351.pool.root.1": "CollectionTree"}],
    step_size=20_000,
    filter_branch=coffea.nanoevents.factory._remove_not_interpretable,
)

# build electron object
el_p4 = dak.zip(
    {
        "pt": tree["AnalysisElectronsAuxDyn.pt"],
        "eta": tree["AnalysisElectronsAuxDyn.eta"],
        "phi": tree["AnalysisElectronsAuxDyn.phi"],
        "mass": tree["AnalysisElectronsAuxDyn.m"],
    },
    with_name="Momentum4D",
)

# select 2-electron events
evt_filter = dak.num(el_p4) == 2
el_p4 = el_p4[evt_filter]

# fill histogram with di-electron system invariant mass and plot
delayed_hist.fill(dak.sum(el_p4, axis=-1).mass / 1_000)
delayed_hist.compute().plot()

This should scale fine to multiple files (step_size might need tuning and might need to be much larger).

@nikoladze
Copy link

coffea version of that would be

import warnings

import awkward as ak
import hist.dask
from coffea.nanoevents import NanoEventsFactory, PHYSLITESchema

warnings.filterwarnings("ignore")

delayed_hist = hist.dask.Hist.new.Reg(120, 0, 120, label="mass [GeV]").Weight()


def filter_name(name):
    return name in [
        "AnalysisElectronsAuxDyn.pt",
        "AnalysisElectronsAuxDyn.eta",
        "AnalysisElectronsAuxDyn.phi",
        "AnalysisElectronsAuxDyn.m",
    ]


events = factory = NanoEventsFactory.from_root(
    {"DAOD_PHYSLITE.34857549._000351.pool.root.1": "CollectionTree"},
    schemaclass=PHYSLITESchema,
    permit_dask=True,
    uproot_options=dict(filter_name=filter_name),
).events()

el_p4 = events.Electrons

# select 2-electron events
evt_filter = ak.num(el_p4) == 2
el_p4 = el_p4[evt_filter]

# fill histogram with di-electron system invariant mass and plot
delayed_hist.fill((el_p4[:, 0] + el_p4[:, 1]).mass / 1_000)
delayed_hist.compute().plot()

by using filter_name which can be passed through via the uproot_options argument to NanoEventsFactory.from_root you can restrict which branches will be considered. The only other difference is that ak.sum that can be used for vector is not supported by the vector implementation thats currently used in coffea.
Note: it is supported to use ak. instead of dak. also for dask_awkward arrays.

@ivukotic
Copy link
Contributor

that works too. both versions work in a bit more than 4 min over 50 files. That's very slow given amount of data read...
Any ideas why so slow?

@alexander-held
Copy link

Is this 4 minutes for a single thread or parallelized across many cores? I could imagine that there is a non-negligible constant overhead of uproot parsing PHYSLITE files, but this feels like too much time even for that.

@matthewfeickert
Copy link
Member Author

matthewfeickert commented Dec 13, 2023

@ivukotic @alexander-held @nikoladze to circle back to this, @ivukotic what are you running exactly? Can you share the code snippet so that we can look at it? This is 4 minutes for 50 files with a local Dask client?

@lgray
Copy link

lgray commented Dec 13, 2023

If you're executing exactly the code given by @nikoladze above then it will be using the threaded scheduler (the default). Uproot is very gil heavy and not great with the threaded executor.

You should spawn a distributed.Client instance for local scale testing.

@lgray
Copy link

lgray commented Dec 13, 2023

That's 100% why it's so slow if you're using more than one core.

@lgray
Copy link

lgray commented Dec 13, 2023

modified version:

import warnings

import awkward as ak
import hist.dask
from coffea.nanoevents import NanoEventsFactory, PHYSLITESchema
from distributed import Client

warnings.filterwarnings("ignore")

delayed_hist = hist.dask.Hist.new.Reg(120, 0, 120, label="mass [GeV]").Weight()


def filter_name(name):
    return name in [
        "AnalysisElectronsAuxDyn.pt",
        "AnalysisElectronsAuxDyn.eta",
        "AnalysisElectronsAuxDyn.phi",
        "AnalysisElectronsAuxDyn.m",
    ]

if __name__ == "__main__":
    client = Client() # or do with Client() as client:
    events = factory = NanoEventsFactory.from_root(
        {"DAOD_PHYSLITE.34857549._000351.pool.root.1": "CollectionTree"},
        schemaclass=PHYSLITESchema,
        permit_dask=True,
        uproot_options=dict(filter_name=filter_name),
    ).events()

    el_p4 = events.Electrons

    # select 2-electron events
    evt_filter = ak.num(el_p4) == 2
    el_p4 = el_p4[evt_filter]

    # fill histogram with di-electron system invariant mass and plot
    delayed_hist.fill((el_p4[:, 0] + el_p4[:, 1]).mass / 1_000)
    delayed_hist.compute().plot()

@ivukotic
Copy link
Contributor

ivukotic commented Dec 13, 2023 via email

@lgray
Copy link

lgray commented Dec 13, 2023

That's happening on the client side so it has nothing to do with k8s, does it work with a local client?

@usatlas usatlas deleted a comment from lgray Dec 14, 2023
@ivukotic
Copy link
Contributor

ivukotic commented Dec 14, 2023

Sorry for the bad formatting. Now should be fine. Code using localcluster:

from dask.distributed import Client
import warnings
import awkward as ak
import hist.dask
from coffea.nanoevents import NanoEventsFactory, PHYSLITESchema

client=Client()

tree_name = "CollectionTree"
xc='root://xcache.af.uchicago.edu:1094//'

def get_data_dict(n=10):
    # data18_13TeV:data18_13TeV.00348885.physics_Main.deriv.DAOD_PHYSLITE.r13286_p4910_p5855_tid34857549_00
    r={}
    with open("data.txt",'r') as mc:
        ls=mc.readlines()
        print(len(ls))
        for i in range(0, min(n,len(ls))):
            r[xc+ls[i].strip()]=tree_name
    return r



warnings.filterwarnings("ignore")

delayed_hist = hist.dask.Hist.new.Reg(120, 0, 120, label="mass [GeV]").Weight()

def filter_name(name):
    return name in [
        "AnalysisElectronsAuxDyn.pt",
        "AnalysisElectronsAuxDyn.eta",
        "AnalysisElectronsAuxDyn.phi",
        "AnalysisElectronsAuxDyn.m",
    ]
    
infile=get_data_dict(1)
print(infile)

events = factory = NanoEventsFactory.from_root(
    infile,
    schemaclass=PHYSLITESchema,
    permit_dask=True,
    uproot_options=dict(filter_name=filter_name),
).events()

el_p4 = events.Electrons

# select 2-electron events
evt_filter = ak.num(el_p4) == 2
el_p4 = el_p4[evt_filter]

# fill histogram with di-electron system invariant mass and plot
delayed_hist.fill((el_p4[:, 0] + el_p4[:, 1]).mass / 1_000)
delayed_hist.compute().plot()

gives:

332
{'root://xcache.af.uchicago.edu:1094//root://dcgftp.usatlas.bnl.gov:1094//pnfs/usatlas.bnl.gov/LOCALGROUPDISK/rucio/data18_13TeV/04/9a/DAOD_PHYSLITE.34857549._000001.pool.root.1': 'CollectionTree'}

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[1], line 46
     37 print(infile)
     39 events = factory = NanoEventsFactory.from_root(
     40     infile,
     41     schemaclass=PHYSLITESchema,
     42     permit_dask=True,
     43     uproot_options=dict(filter_name=filter_name),
     44 ).events()
---> 46 el_p4 = events.Electrons
     48 # select 2-electron events
     49 evt_filter = ak.num(el_p4) == 2

File /venv/lib/python3.9/site-packages/dask_awkward/lib/core.py:1309, in Array.__getattr__(self, attr)
   1306     elif self._maybe_behavior_property(attr):
   1307         return self._call_behavior_property(attr)
-> 1309     raise AttributeError(f"{attr} not in fields.")
   1310 try:
   1311     # at this point attr is either a field or we'll have to
   1312     # raise an exception.
   1313     return self.__getitem__(attr)

AttributeError: Electrons not in fields.

@lgray
Copy link

lgray commented Dec 14, 2023

@usatlas why are you deleting my posts?

@lgray
Copy link

lgray commented Dec 14, 2023

@ivukotic you appear to be using a very old version of dask if it is not complaining at you about being in a if __name__ == "__main__": guarded section of code.

To say anything more I'd have to have access to the file you're using - the tests we run in coffea that check this collection specifically do pass on the test sample that we have.

It may be prudent to upgrade to the (very) recent release of coffea 2023 (will be on pypi in ~30 minutes), just so all the versions of dependencies are lined up well.

@lgray
Copy link

lgray commented Dec 14, 2023

Why are you applying your own filter for branches? Is there something missing in the coffea version that removes only uninterpretable branches that fails for your PHYSLITE files? If so please submit an issue and provide relevant testing data (even a 40 events file, which I'm sure is shareable, is enough), we would like to continue to make sure our code work for a variety of experiments.

You should not need to downselect so heavily since reading is delayed.

@ivukotic
Copy link
Contributor

@lgray I deleted part of the thread with badly formatted messages. This was executed in JupyterLab and the version is:
+---------+----------------+-----------------+---------+
| Package | Client | Scheduler | Workers |
+---------+----------------+-----------------+---------+
| msgpack | 1.0.7 | 1.0.6 | None |
| python | 3.9.12.final.0 | 3.10.12.final.0 | None |

@lgray
Copy link

lgray commented Dec 14, 2023

@ivukotic I know this may sound like a strange request, but please do not delete my posts in the future. I find it extremely distasteful (even if I understand why you did it in this case).

@ekourlit
Copy link

ekourlit commented Dec 19, 2023

Hi @lgray and thanks for jumping in to help! I'm not sure what @ivukotic is after here but I can give you some feedback on few things I tried with the snippets @alexander-held and @nikoladze posted above and the latest (released) version of coffea and a PHYSLITE file. Also tagging @jackharrison111 who's working on a relevant project with me.

So the snippet from @alexander-held (without coffea) is working out of the box and the snippet from @nikoladze (with coffea) is also working well if one changes permit_dask to delayed for the latest version of coffea. Wrapping the code under with Client() as client: is also working.

Why are you applying your own filter for branches?

We do that because some of the PHYSLITE branches have broken reading in awkward forth (scikit-hep/uproot5#951). Also this option should help loading less data from the disk to memory, essentially only the columns needed, which are user-defined here by the filter_name function. Unless coffea/dask is able to figure out on itself which columns are needed and only load those from the files. You should let us know if this is the case and maybe we can use coffea.nanoevents.factory._remove_not_interpretable somehow to get rid of the broken branches. BTW, I tied to run NanoEventsFactory.from_root without any uproot_options or with uproot_options=dict(filter_branch=coffea.nanoevents.factory._remove_not_interpretable) and both are failing.

Another weird behaviour I find is that NanoEventsFactory.from_root fails when I turn delayed=False. For instance:

# works
events = factory = NanoEventsFactory.from_root(
    {file_path: "CollectionTree"},
    schemaclass=PHYSLITESchema,
    delayed=True,
    uproot_options=dict(filter_name=filter_name)
).events()

# fails
events = factory = NanoEventsFactory.from_root(
    {file_path: "CollectionTree"},
    schemaclass=PHYSLITESchema,
    delayed=False,
    uproot_options=dict(filter_name=filter_name)
).events()

# works
events = factory = NanoEventsFactory.from_root(
    {file_path: "CollectionTree"},
    schemaclass=PHYSLITESchema,
    delayed=True,
    uproot_options=dict(filter_name=filter_name)
).events().compute()

Any input here is much appreciated and I can of course follow up opening a coffea issue if you think it's more appropriate.

@nikoladze
Copy link

nikoladze commented Dec 19, 2023

# fails
events = factory = NanoEventsFactory.from_root(
    {file_path: "CollectionTree"},
    schemaclass=PHYSLITESchema,
    delayed=False,
    uproot_options=dict(filter_name=filter_name)
).events()

I believe this fails because uproot_options are passed to uproot.open in non-dask mode, which does not have the filter_name option (since there is nothing to be read yet). Probably it would be nice to also be able to pass filters through here - in non-dask mode it probably rarely makes sense to read everything. Maybe a discussion to continue in the coffea github project.

@nikoladze
Copy link

current workaround would be to replace uproot_options=dict(filter_name=filter_name) by iteritems_options=dict(filter_name=filter_name) in non-dask mode.

@ekourlit
Copy link

current workaround would be to replace uproot_options=dict(filter_name=filter_name) by iteritems_options=dict(filter_name=filter_name) in non-dask mode.

Thank you @nikoladze, this works for me! I didn't know about this option as it's not documented. I will keep an eye on the issue you opened.

@matthewfeickert
Copy link
Member Author

matthewfeickert commented Dec 20, 2023

Okay, so while I am still confused about a few things (adaptive auto scaling, how to connect to cluster dashboards that I create myself, why we still need the filter_name and how/if we can fix this in coffea's PHYSLITESchema.) I have scaling working on the UChicago Analysis Facility Jupyter Hub system using this repo's container image (hub.opensciencegrid.org/usatlas/analysis-dask-base:latest) for the scheduler and worker container image.

What helped was when @fengpinghu pointed out to me

my understanding is that users should not use that existing dask cluster because the dask clusters is not meant to be shared(users would step into each other's toes). I think users are suppose to create a dask cluster themself with the dask operator with the requirements specified by the users (e.g. container image for the workers). Here's some documents about customizing your own dask clusters -- https://kubernetes.dask.org/en/latest/operator_kubecluster.html.
...
on the AF we run the notebook with a serviceAccount which have a rolebinding that allow it to create daskclusters.

👍

The following Python should work for anyone who has access to the UChicago Jupyter Lab cluster and uses the mc.txt from @ivukotic's comment #4 (comment). The actual computation part we care about takes about 10-11 seconds for all 50 mc.txt files (which I think is ~ 1 TB in total) when scaled across 50 workers.

import time
import warnings

import awkward as ak
import hist.dask
from coffea.nanoevents import NanoEventsFactory, PHYSLITESchema
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
from distributed import Client

warnings.filterwarnings("ignore")

# Setup a KubeCluster
spec = make_cluster_spec(
    name="analysis-base",
    image="hub.opensciencegrid.org/usatlas/analysis-dask-base:latest",
)

cluster = KubeCluster(custom_cluster_spec=spec)

# This doesn't seem to work as expected and scale up as work starts
cluster.adapt(minimum=1, maximum=50)
print(f"Dashboard: {cluster.dashboard_link}")  # Dashboard link won't open (404s)

client = Client(cluster)


# Without filter_name then delayed_hist.compute() will error with
# AttributeError: 'NoneType' object has no attribute 'reset_active_node'
def filter_name(name):
    return name in (
        "AnalysisElectronsAuxDyn.pt",
        "AnalysisElectronsAuxDyn.eta",
        "AnalysisElectronsAuxDyn.phi",
        "AnalysisElectronsAuxDyn.m",
    )


def get_data_dict(
    n=10,
    read_file="mc.txt",
    tree_name="CollectionTree",
    xc="root://xcache.af.uchicago.edu:1094//",
):
    r = {}
    with open(read_file, "r") as readfile:
        ls = readfile.readlines()
        _range_max = min(n, len(ls))
        print(f"Processing {_range_max} out of {len(ls)} files")
        for i in range(0, _range_max):
            r[xc + ls[i].strip()] = tree_name
    return r


file_uris = get_data_dict(100)

events = NanoEventsFactory.from_root(
    file_uris,
    schemaclass=PHYSLITESchema,
    uproot_options=dict(filter_name=filter_name),
    delayed=True,
).events()

# Lay out the event selection logic
el_p4 = events.Electrons

# select 2-electron events
evt_filter = ak.num(el_p4) == 2
el_p4 = el_p4[evt_filter]

# Now scale across the KubeCluster to multiple workers
cluster.scale(50)
# ensure cluster has finished scaling
time.sleep(30)
print(cluster)

# fill histogram with di-electron system invariant mass and plot
delayed_hist = hist.dask.Hist.new.Reg(120, 0, 120, label="mass [GeV]").Weight()
delayed_hist.fill((el_p4[:, 0] + el_p4[:, 1]).mass / 1_000)

# This takes about:
# 24 seconds for 50 files and 10 workers
# 13 seconds for 50 files and 25 workers
# 11 seconds for 50 files and 50 workers
# 12 seconds for 50 files and 100 workers
_start = time.time()
result_hist = delayed_hist.compute()
_stop = time.time()

print(
    f"Cluster with {cluster.n_workers} workers finished in {_stop-_start:.2f} seconds."
)
delayed_hist.visualize()

artists = result_hist.plot()

fig = artists[0][0].get_figure()
ax = fig.get_axes()[0]

ax.set_ylabel("Count")
fig.savefig("mass.png")

client.close()
cluster.close()

mass

This further addresses points on @lukasheinrich's outline comment #4 (comment). So I think we can move forward with taking @mvigl's notebook and then trying it in the distributed manner (c.f. https://gitlab.cern.ch/gstark/pycolumnarprototype/-/issues/3 for start of this).

@mvigl
Copy link

mvigl commented Feb 2, 2024

Hi all, I can run the code above on the UChicago Jupyter Lab cluster but then if one tries to access Electrons.caloClusters via element links adding these few lines to the code

import coffea.nanoevents
from coffea.nanoevents import PHYSLITESchema
from coffea.nanoevents.methods.physlite import _element_link_method

PHYSLITESchema.mixins["egammaClusters"] = "NanoCollection"
@property
def caloClusters(self, _dask_array_=None):
    return _element_link_method(
        self, "caloClusterLinks", "egammaClusters", _dask_array_
    )
coffea.nanoevents.methods.physlite.Electron.caloClusters = caloClusters

def filter_name(name):
    return name in (
        "AnalysisElectronsAuxDyn.pt",
        "AnalysisElectronsAuxDyn.eta",
        "AnalysisElectronsAuxDyn.phi",
        "AnalysisElectronsAuxDyn.m",
        "AnalysisElectronsAuxDyn.caloClusterLinks",
        "egammaClustersAuxDyn.calE",
        "egammaClustersAuxDyn.calEta"

    )

one gets the following error after clusters = events.Electrons.caloClusters

---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
File /venv/lib/python3.9/site-packages/awkward/_dispatch.py:56, in named_high_level_function.<locals>.dispatch(*args, **kwargs)
     55 if result is NotImplemented:
---> 56     raise NotImplementedError
     57 else:

NotImplementedError: 

The above exception was the direct cause of the following exception:

NotImplementedError                       Traceback (most recent call last)
Cell In[3], line 81
     73 events = NanoEventsFactory.from_root(
     74     file_uris,
     75     schemaclass=PHYSLITESchema,
     76     uproot_options=dict(filter_name=filter_name),
     77     delayed=True,
     78 ).events()
     80 # Lay out the event selection logic
---> 81 clusters = events.Electrons.caloClusters #fails!
     82 el_p4 = events.Electrons
     84 # select 2-electron events

File /venv/lib/python3.9/site-packages/dask_awkward/lib/core.py:1526, in Array.__getattr__(self, attr)
   1524             return wrapper
   1525         else:
-> 1526             return self.map_partitions(
   1527                 _BehaviorPropertyFn(attr),
   1528                 label=hyphenize(attr),
   1529             )
   1530 try:
   1531     # at this point attr is either a field or we'll have to
   1532     # raise an exception.
   1533     return self.__getitem__(attr)

File /venv/lib/python3.9/site-packages/dask_awkward/lib/core.py:1570, in Array.map_partitions(self, func, traverse, *args, **kwargs)
   1537 def map_partitions(
   1538     self,
   1539     func: Callable,
   (...)
   1542     **kwargs: Any,
   1543 ) -> Array:
   1544     """Map a function across all partitions of the collection.
   1545 
   1546     Parameters
   (...)
   1568 
   1569     """
-> 1570     return map_partitions(func, self, *args, traverse=traverse, **kwargs)

File /venv/lib/python3.9/site-packages/dask_awkward/lib/core.py:2024, in map_partitions(base_fn, label, token, meta, output_divisions, traverse, *args, **kwargs)
   2016 lay = partitionwise_layer(
   2017     fn,
   2018     name,
   2019     *arg_flat_deps_expanded,
   2020     *kwarg_flat_deps,
   2021 )
   2023 if meta is None:
-> 2024     meta = map_meta(fn, *arg_flat_deps_expanded, *kwarg_flat_deps)
   2026 hlg = HighLevelGraph.from_collections(
   2027     name,
   2028     lay,
   2029     dependencies=flat_deps,
   2030 )
   2032 dak_arrays = tuple(filter(lambda x: isinstance(x, Array), flat_deps))

File /venv/lib/python3.9/site-packages/dask_awkward/lib/core.py:2412, in map_meta(fn, *deps)
   2408 def map_meta(fn: ArgsKwargsPackedFunction, *deps: Any) -> ak.Array | None:
   2409     # NOTE: fn is assumed to be a *packed* function
   2410     #       as defined up in map_partitions. be careful!
   2411     try:
-> 2412         meta = fn(*to_meta(deps))
   2413         return meta
   2414     except Exception as err:
   2415         # if compute-unknown-meta is False then we don't care about
   2416         # this failure and we return None.

File /venv/lib/python3.9/site-packages/dask_awkward/lib/core.py:1885, in ArgsKwargsPackedFunction.__call__(self, *args_deps_expanded)
   1883     len_args += n_args
   1884 kwargs = self.kwarg_repacker(args_deps_expanded[len_args:])[0]
-> 1885 return self.fn(*args, **kwargs)

File /venv/lib/python3.9/site-packages/dask_awkward/lib/core.py:2520, in _BehaviorPropertyFn.__call__(self, coll)
   2519 def __call__(self, coll: ak.Array) -> ak.Array:
-> 2520     return getattr(coll, self.attr)

Cell In[3], line 17, in caloClusters(self, _dask_array_)
     15 @property
     16 def caloClusters(self, _dask_array_=None):
---> 17     return _element_link_method(
     18         self, "caloClusterLinks", "egammaClusters", _dask_array_
     19     )

File /venv/lib/python3.9/site-packages/coffea/nanoevents/methods/physlite.py:54, in _element_link_method(self, link_name, target_name, _dask_array_)
     47     return _element_link(
     48         target,
     49         _dask_array_._eventindex,
     50         links.m_persIndex,
     51         links.m_persKey,
     52     )
     53 links = self[link_name]
---> 54 return _element_link(
     55     self._events()[target_name],
     56     self._eventindex,
     57     links.m_persIndex,
     58     links.m_persKey,
     59 )

File /venv/lib/python3.9/site-packages/coffea/nanoevents/methods/physlite.py:38, in _element_link(target_collection, eventindex, index, key)
     37 def _element_link(target_collection, eventindex, index, key):
---> 38     global_index = _get_global_index(target_collection, eventindex, index)
     39     global_index = awkward.where(key != 0, global_index, -1)
     40     return target_collection._apply_global_index(global_index)

File /venv/lib/python3.9/site-packages/coffea/nanoevents/methods/physlite.py:131, in _get_global_index(target, eventindex, index)
    129     if load_column.ndim < 3:
    130         break
--> 131 target_offsets = _get_target_offsets(load_column, eventindex)
    132 return target_offsets + index

File /venv/lib/python3.9/site-packages/coffea/nanoevents/methods/physlite.py:108, in _get_target_offsets(load_column, event_index)
    104     return offsets[event_index]
    106 # let the necessary column optimization know that we need to load this
    107 # column to get the offsets
--> 108 if awkward.backend(load_column) == "typetracer":
    109     awkward.typetracer.touch_data(load_column)
    111 # necessary to stick it into the `NumpyArray` constructor
    112 # if typetracer is passed through

File /venv/lib/python3.9/site-packages/awkward/_dispatch.py:70, in named_high_level_function.<locals>.dispatch(*args, **kwargs)
     65     else:
     66         raise AssertionError(
     67             "high-level functions should only implement a single yield statement"
     68         )
---> 70 return gen_or_result

File /venv/lib/python3.9/site-packages/awkward/_errors.py:85, in ErrorContext.__exit__(self, exception_type, exception_value, traceback)
     78 try:
     79     # Handle caught exception
     80     if (
     81         exception_type is not None
     82         and issubclass(exception_type, Exception)
     83         and self.primary() is self
     84     ):
---> 85         self.handle_exception(exception_type, exception_value)
     86 finally:
     87     # Step out of the way so that another ErrorContext can become primary.
     88     if self.primary() is self:

File /venv/lib/python3.9/site-packages/awkward/_errors.py:95, in ErrorContext.handle_exception(self, cls, exception)
     93     self.decorate_exception(cls, exception)
     94 else:
---> 95     raise self.decorate_exception(cls, exception)

NotImplementedError: 

See if this has been reported at https://github.com/scikit-hep/awkward/issues

which is the same seen here https://gitlab.cern.ch/gstark/pycolumnarprototype/-/issues/3 when running the tools and the whole reason why they fail. So I think this is what we should try to figure out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants