Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reduce function #5533

Closed
wants to merge 75 commits into from
Closed

Conversation

AJDERS
Copy link

@AJDERS AJDERS commented Feb 15, 2023

This PR closes #5496 .

I tried to imitate the reduce-method from functools, i.e. the function input must be a binary operation. I assume that the input type has an empty element, i.e. input_type() is defined, as the acumulant is instantiated as this object - im not sure that is this a reasonable assumption?

If batched= True the reduction of each shard is not returned, but the reduction of the entire dataset. I was unsure wether this was an intuitive API, or it would make more sense to return the reduction of each shard?

Copy link
Member

@lhoestq lhoestq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation is starting to have a great shape ! Thanks a lot :)

I added some suggestions, and also one question regarding the output which IMO should be of the same type as the initializer (maybe I should have noticed this earlier - sorry for the back and forth)

src/datasets/arrow_dataset.py Outdated Show resolved Hide resolved
src/datasets/arrow_dataset.py Outdated Show resolved Hide resolved
Comment on lines +3440 to +3441
>>> result
{'text': Counter({'and': 2, 'compassionately': 1, 'explores': 1, 'the': 1, 'seemingly': 1, 'irreconcilable': 1, 'situation': 1, 'between': 1, 'conservative': 1, 'christian': 1, 'parents': 1, 'their': 1, 'estranged': 1, 'gay': 1, 'lesbian': 1, 'children': 1, '.': 1})}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the initializer is a Counter(), I think we can expect the output to be a Counter() as well no ?

Copy link
Author

@AJDERS AJDERS Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point! It would also align more with the behaviour of functools.reduce. On the other hand, its a bit annoying that the output of different parameter variations is not the same type, as an example, r1 and r2 below would be int and dict respectively.

int_ds = Dataset.from_dict({"x": [1, 2, 3], "y": [1, 2, 3]})
sum = lambda x,y: x+y
r1 = reduce(sum, initializer=0)
# 6
r2 = reduce(sum)
# {"x": 6, "y":6}

I don't really have a strong opinion one way or the other, either would be confusion/annoying in some way. Which do you prefer?

src/datasets/arrow_dataset.py Outdated Show resolved Hide resolved
src/datasets/arrow_dataset.py Outdated Show resolved Hide resolved
@mariosasko
Copy link
Collaborator

The proposed API doesn't seem intuitive to me - one can already use functools.reduce or Dataset.map for this purpose (Colab with examples), so perhaps we could have a section in the docs that uses these methods to perform reductions rather than introducing a new method (which needs to be maintained later)

@lhoestq
Copy link
Member

lhoestq commented Feb 22, 2023

Thanks for sharing this google colab, it has nice examples !

Though I still think functools.reduce with multiprocessing can be a pain - we offer something easier here:

  • no need to use a pool yourself
  • no need to use map just to iterate on the dataset (not its main purpose)
  • native support for lambdas (using dill)
  • the combiner is mandatory for multiprocessing to avoid ending up with an incorrect result as in your example

However I agree that maintaining this can be challenging, especially if you think about how map already is, and if we also have to deal with dataset formatting.

@mariosasko
Copy link
Collaborator

native support for lambdas (using dill)

Replacing multiprocessing with multiprocess in the example would allow that.

no need to use map just to iterate on the dataset (not its main purpose)

Not the main purpose, but this was mentioned as a "feature" in the previous docs if I remember.

And all this is related to the multi-processing case, which we can document.

Besides the linked issue, I can't find requests for Dataset.reduce, which makes me think functools.reduce does the job for most users.

@lhoestq
Copy link
Member

lhoestq commented Feb 22, 2023

Besides the linked issue, I can't find requests for Dataset.reduce, which makes me think functools.reduce does the job for most users.

I think @srush was looking for a way to do a word count but ended up using a single processed map. I also saw some users on the forum wanting to compute max

Not the main purpose, but this was mentioned as a "feature" in the previous docs if I remember.

And all this is related to the multi-processing case, which we can document.

Yup indeed

@srush
Copy link
Contributor

srush commented Feb 22, 2023

While counting is one example, I often find I want to compute different statistics over a dataset. This seems like a natural way to do it in a stateless manner.

I guess you could use functools reduce, but that wouldn't allow batching, right?

@mariosasko
Copy link
Collaborator

mariosasko commented Feb 22, 2023

I've updated the Colab with an example that reduces batches with map and then computes the final result. It would be nice to have a similar example (explained in detail) in the docs to show the full power of map.

Plus, for simple reductions such as max, one can do pc.max(ds.with_format("arrow")["col"]) to directly get the result (without loading the entire column in RAM).

@srush

I guess you could use functools reduce, but that wouldn't allow batching, right?

You can use .iter(batch_size) to get batches

@srush
Copy link
Contributor

srush commented Feb 22, 2023

That functools tools example is clean. I didn't know about iter. That would handle my use case.

The stateful map with a global variable is pretty hairy. I don't think we should recommend people do that.

@AJDERS
Copy link
Author

AJDERS commented Feb 22, 2023

Whenever I in the past wanted to calculate statistics for datasets I used functools similarly to how it's described in the colab, but I always felt it was a bit of a hassle to use it together with multiprocessing, which is why I picked up the issue, to do it "once and for all".

@AJDERS
Copy link
Author

AJDERS commented Feb 27, 2023

Should i close this and open another PR, with descriptions of how to use map for reduction, or?

@lhoestq
Copy link
Member

lhoestq commented Feb 27, 2023

Yes I think good documentation is the way to go here. @mariosasko 's examples are clear and efficient.

Maybe we could have an Aggregations section in the Process page with some guides on how to:

  • use .map() to compute aggregates
  • use .with_format("arrow") for max, min, etc. to save RAM and get max speed
  • use a multiprocessed .map() to get partial results in parallel and combine them (max text length example)
  • (advanced) use multiprocessing with an arbitrary accumulator (word count example)

And also a new conceptual guide on Multiprocessed mapping to say that it helps speed up CPU intensive processing but why it may lead to incorrect results when computing aggregates.

cc @stevhliu for visibility and if you have some comments

@stevhliu
Copy link
Member

I would create a Reduce - to be more exact - subsection under Map to demonstrate these examples since we're showing how they can be done with the Dataset.map function. It'd also be good to add a link to the new concept guide from this section to solidify user understanding :)

@AJDERS
Copy link
Author

AJDERS commented Feb 28, 2023

Coolio. Ill close this PR and get going on another one adding what we've discussed during the next couple of days!

@AJDERS AJDERS closed this Feb 28, 2023
@mariosasko mariosasko mentioned this pull request Jul 21, 2023
@taha-yassine
Copy link

Is adding a section to the docs still planned? Couldn't find any related PR.

@lhoestq
Copy link
Member

lhoestq commented Nov 25, 2024

There is a new integration with polars which is convenient btw. Here is an example for computing the length of the longest dialogue in a dataset using polars:

>>> from datasets import load_dataset
>>> ds = load_dataset("HuggingFaceTB/smoltalk", "all", split="train")
>>> df = ds.to_polars()
>>> df.head()
shape: (5, 2)
┌─────────────────────────────────┬───────────────────┐
│ messagessource            │
│ ------               │
│ list[struct[2]]                 ┆ str               │
╞═════════════════════════════════╪═══════════════════╡
│ [{"The function \( g(x) \) sat… ┆ numina-cot-100k   │
│ [{"Ben twice chooses a random … ┆ numina-cot-100k   │
│ [{"Find all values of $x$ that… ┆ numina-cot-100k   │
│ [{"How can you help me? I'm wr… ┆ smol-magpie-ultra │
│ [{"Extract and present the mai… ┆ smol-summarize    │
└─────────────────────────────────┴───────────────────┘
>>> df["messages"].list.len().max()
58

For very large scale dataset it can be worth using map() on batches of data to compute intermediate results, save some memory, and cache the result:

>>> f = lambda df: pl.DataFrame({"messages_max_length": [df["messages"].list.len().max()]})
>>> intermediate_ds = ds.with_format("polars").map(f, batched=True)  # you can also set batch_size=
>>> intermediate_ds.to_polars()["messages_max_length"].max()
58

This last method can be used to implement a map + intermediate reduce + final reduce approach

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add a reduce method