From 06b1e6abe2c192d928c38735194e9409bf0b2925 Mon Sep 17 00:00:00 2001 From: Yilun Huang Date: Tue, 7 Jan 2025 17:56:22 +0800 Subject: [PATCH] docs for distributed processing (#523) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * + add docs for distributed processing * complete the distribution doc en version * + add deduplication demo for distributed doc * + add Chinese version of distributed doc * * fix wrong header level * Improve some words, and the "split" and "json" subsections according to disscusion --------- Co-authored-by: 道辕 --- demos/process_on_ray/configs/dedup.yaml | 15 +++ demos/process_on_ray/configs/demo.yaml | 5 +- docs/Distributed.md | 149 +++++++++++++++++++++++ docs/Distributed_ZH.md | 150 ++++++++++++++++++++++++ 4 files changed, 317 insertions(+), 2 deletions(-) create mode 100644 demos/process_on_ray/configs/dedup.yaml create mode 100644 docs/Distributed.md create mode 100644 docs/Distributed_ZH.md diff --git a/demos/process_on_ray/configs/dedup.yaml b/demos/process_on_ray/configs/dedup.yaml new file mode 100644 index 000000000..642203249 --- /dev/null +++ b/demos/process_on_ray/configs/dedup.yaml @@ -0,0 +1,15 @@ +# Process config example for dataset + +# global parameters +project_name: 'demo-dedup' +dataset_path: './demos/process_on_ray/data/' +export_path: './outputs/demo-dedup/demo-ray-bts-dedup-processed' + +executor_type: 'ray' +ray_address: 'auto' + +# process schedule +# a list of several process operators with their arguments +process: + - ray_bts_minhash_deduplicator: + tokenization: 'character' \ No newline at end of file diff --git a/demos/process_on_ray/configs/demo.yaml b/demos/process_on_ray/configs/demo.yaml index 1e3e4a55a..5154da014 100644 --- a/demos/process_on_ray/configs/demo.yaml +++ b/demos/process_on_ray/configs/demo.yaml @@ -2,11 +2,12 @@ # global parameters project_name: 'ray-demo' -executor_type: 'ray' dataset_path: './demos/process_on_ray/data/demo-dataset.jsonl' # path to your dataset directory or file -ray_address: 'auto' # change to your ray cluster address, e.g., ray://: export_path: './outputs/demo/demo-processed' +executor_type: 'ray' +ray_address: 'auto' # change to your ray cluster address, e.g., ray://: + # process schedule # a list of several process operators with their arguments process: diff --git a/docs/Distributed.md b/docs/Distributed.md new file mode 100644 index 000000000..e70e4ba60 --- /dev/null +++ b/docs/Distributed.md @@ -0,0 +1,149 @@ +# Distributed Data Processing in Data-Juicer + +## Overview + +Data-Juicer supports large-scale distributed data processing based on [Ray](https://github.com/ray-project/ray) and Alibaba's [PAI](https://www.aliyun.com/product/bigdata/learn). + +With a dedicated design, almost all operators of Data-Juicer implemented in standalone mode can be seamlessly executed in Ray distributed mode. We continuously conduct engine-specific optimizations for large-scale scenarios, such as data subset splitting strategies that balance the number of files and workers, and streaming I/O patches for JSON files to Ray and Apache Arrow. + +For reference, in our experiments with 25 to 100 Alibaba Cloud nodes, Data-Juicer in Ray mode processes datasets containing 70 billion samples on 6400 CPU cores in 2 hours and 7 billion samples on 3200 CPU cores in 0.45 hours. Additionally, a MinHash-LSH-based deduplication operator in Ray mode can deduplicate terabyte-sized datasets on 8 nodes with 1280 CPU cores in 3 hours. + +More details can be found in our paper, [Data-Juicer 2.0: Cloud-Scale Adaptive Data Processing for Foundation Models](arXiv_link_coming_soon). + +![Arch-Overview]( +https://img.alicdn.com/imgextra/i4/O1CN01uawwRu1JMSdafy5lF_!!6000000001014-2-tps-4034-4146.png) + +## Implementation and Optimizations + +### Ray Mode in Data-Juicer + +- For most implementations of Data-Juicer [operators](Operators.md), the core processing functions are engine-agnostic. Interoperability is primarily managed in [RayDataset](../data_juicer/core/ray_data.py) and [RayExecutor](../data_juicer/core/ray_executor.py), which are subclasses of the base `DJDataset` and `BaseExecutor`, respectively, and support both Ray [Tasks](https://docs.ray.io/en/latest/ray-core/tasks.html) and [Actors](https://docs.ray.io/en/latest/ray-core/actors.html). +- The exception is the deduplication operators, which are challenging to scale in standalone mode. We provide these operators named as [`ray_xx_deduplicator`](../data_juicer/ops/deduplicator/). + +### Subset Splitting + +When dealing with tens of thousands of nodes but only a few dataset files, Ray would split the dataset files according to available resources and distribute the blocks across all nodes, incurring huge network communication costs and reduces CPU utilization. For more details, see [Ray's autodetect_parallelism](https://github.com/ray-project/ray/blob/2dbd08a46f7f08ea614d8dd20fd0bca5682a3078/python/ray/data/_internal/util.py#L201-L205) and [tuning output blocks for Ray](https://docs.ray.io/en/latest/data/performance-tips.html#tuning-output-blocks-for-read). + +This default execution plan can be quite inefficient especially for scenarios with large number of nodes. To optimize performance for such cases, we automatically splitting the original dataset into smaller files in advance, taking into consideration the features of Ray and Arrow. When users encounter such performance issues, they can utilize this feature or split the dataset according to their own preferences. In our auto-split strategy, the single file size is set to 128MB, and the result should ensure that the number of sub-files after splitting is at least twice the total number of CPU cores available in the cluster. + +### Streaming Reading of JSON Files + +Streaming reading of JSON files is a common requirement in data processing for foundation models, as many datasets are stored in JSONL format and in huge sizes. +However, the current implementation in Ray Datasets, which is rooted in the underlying Arrow library (up to Ray version 2.40 and Arrow version 18.1.0), does not support streaming reading of JSON files. + +To address the lack of native support for streaming JSON data, we have developed a streaming loading interface and contributed an in-house [patch](https://github.com/modelscope/data-juicer/pull/515) for Apache Arrow ([PR to the repo](https://github.com/apache/arrow/pull/45084)). This patch helps alleviate Out-of-Memory issues. With this patch, Data-Juicer in Ray mode will, by default, use the streaming loading interface to load JSON files. +Besides, streaming-read support for CSV and Parquet files is already enabled. + + +### Deduplication + +An optimized MinHash-LSH-based Deduplicator is provided in Ray mode. We implement a multiprocess Union-Find set in Ray Actors and a load-balanced distributed algorithm, [BTS](https://ieeexplore.ieee.org/document/10598116), to complete equivalence class merging. This operator can deduplicate terabyte-sized datasets on 1280 CPU cores in 3 hours. Our ablation study shows 2x to 3x speedups with our dedicated optimizations for Ray mode compared to the vanilla version of this deduplication operator. + +## Performance Results + +### Data Processing with Varied Scales + +We conducted experiments on datasets with billions of samples. We prepared a 560k-sample multimodal dataset and expanded it by different factors (1x to 125000x) to create datasets of varying sizes. The experimental results, shown in the figure below, demonstrate good scalability. + +![Overview](https://img.alicdn.com/imgextra/i3/O1CN01JV8wcC1oxn0G2xnBT_!!6000000005292-0-tps-1328-1742.jpg) + +### Distributed Deduplication on Large-Scale Datasets + +We tested the MinHash-based RayDeduplicator on datasets sized at 200GB, 1TB, and 5TB, using CPU counts ranging from 640 to 1280 cores. As the table below shows, when the data size increases by 5x, the processing time increases by 4.02x to 5.62x. When the number of CPU cores doubles, the processing time decreases to 58.9% to 67.1% of the original time. + +| # CPU | 200GB Time | 1TB Time | 5TB Time | +|---------|------------|-----------|------------| +| 4 * 160 | 11.13 min | 50.83 min | 285.43 min | +| 8 * 160 | 7.47 min | 30.08 min | 168.10 min | + +## Quick Start + +Before starting, you should install Data-Juicer and its `dist` requirements: + +```shell +pip install -v -e . # Install the minimal requirements of Data-Juicer +pip install -v -e ".[dist]" # Include dependencies on Ray and other distributed libraries +``` + +Then start a Ray cluster (ref to the [Ray doc](https://docs.ray.io/en/latest/ray-core/starting-ray.html) for more details): + +```shell +# Start a cluster as the head node +ray start --head + +# (Optional) Connect to the cluster on other nodes/machines. +ray start --address='{head_ip}:6379' +``` + +We provide simple demos in the directory `demos/process_on_ray/`, which includes two config files and two test datasets. + +```text +demos/process_on_ray +├── configs +│ ├── demo.yaml +│ └── dedup.yaml +└── data + ├── demo-dataset.json + └── demo-dataset.jsonl +``` + +> [!Important] +> If you run these demos on multiple nodes, you need to put the demo dataset to a shared disk (e.g. NAS) and export the result dataset to it as well by modifying the `dataset_path` and `export_path` in the config files. + +### Running Example of Ray Mode + +In the `demo.yaml` config file, we set the executor type to "ray" and specify an automatic Ray address. + +```yaml +... +dataset_path: './demos/process_on_ray/data/demo-dataset.jsonl' +export_path: './outputs/demo/demo-processed' + +executor_type: 'ray' # Set the executor type to "ray" +ray_address: 'auto' # Set an automatic Ray address +... +``` + +Run the demo to process the dataset with 12 regular OPs: + +```shell +# Run the tool from source +python tools/process_data.py --config demos/process_on_ray/configs/demo.yaml + +# Use the command-line tool +dj-process --config demos/process_on_ray/configs/demo.yaml +``` + +Data-Juicer will process the demo dataset with the demo config file and export the result datasets to the directory specified by the `export_path` argument in the config file. + +### Running Example of Distributed Deduplication + +In the `dedup.yaml` config file, we set the executor type to "ray" and specify an automatic Ray address. +And we use a dedicated distributed version of MinHash Deduplicator to deduplicate the dataset. + +```yaml +project_name: 'demo-dedup' +dataset_path: './demos/process_on_ray/data/' +export_path: './outputs/demo-dedup/demo-ray-bts-dedup-processed' + +executor_type: 'ray' # Set the executor type to "ray" +ray_address: 'auto' # Set an automatic Ray address + +# process schedule +# a list of several process operators with their arguments +process: + - ray_bts_minhash_deduplicator: # a distributed version of minhash deduplicator + tokenization: 'character' +``` + +Run the demo to deduplicate the dataset: + +```shell +# Run the tool from source +python tools/process_data.py --config demos/process_on_ray/configs/dedup.yaml + +# Use the command-line tool +dj-process --config demos/process_on_ray/configs/dedup.yaml +``` + +Data-Juicer will dedup the demo dataset with the demo config file and export the result datasets to the directory specified by the `export_path` argument in the config file. diff --git a/docs/Distributed_ZH.md b/docs/Distributed_ZH.md new file mode 100644 index 000000000..027e40bba --- /dev/null +++ b/docs/Distributed_ZH.md @@ -0,0 +1,150 @@ +# Data-Juicer 分布式数据处理 + +## 概览 + +Data-Juicer 支持基于 [Ray](https://github.com/ray-project/ray) 和阿里巴巴 [PAI](https://www.aliyun.com/product/bigdata/learn) 的大规模分布式数据处理。 + +经过专门的设计后,几乎所有在单机模式下实现的 Data-Juicer 算子都可以无缝地运行在 Ray 的分布式模式下。对于大规模场景,我们继续进行了针对计算引擎的特定优化,例如用于平衡文件和进程数目的数据子集分割策略,针对 Ray 和 Apache Arrow的 JSON 文件流式 I/O 补丁等。 + +作为参考,我们在 25 到 100 个阿里云节点上进行了实验,使用 Ray 模式下的 Data-Juicer 处理不同的数据集。在 6,400 个 CPU 核上处理包含 700 亿条样本的数据集只需要花费 2 小时,在 3,200 个 CPU 核上处理包含 70 亿条样本的数据集只需要花费 0.45 小时。此外,在 Ray 模式下,对 TB 大小级别的数据集,Data-Juicer 的 MinHash-LSH 去重算子在 1,280 个 CPU 核的 8 节点集群上进行去重只需 3 小时。 + +更多细节请参考我们的论文:[Data-Juicer 2.0: Cloud-Scale Adaptive Data Processing for Foundation Models](arXiv_link_coming_soon) 。 + +![Arch-Overview]( +https://img.alicdn.com/imgextra/i4/O1CN01uawwRu1JMSdafy5lF_!!6000000001014-2-tps-4034-4146.png) + +## 实现与优化 + +### Data-Juicer 的 Ray 处理模式 + +- 对于 Data-Juicer 的大部分[算子](Operators.md)实现,其核心处理函数是引擎无关的。[RayDataset](../data_juicer/core/ray_data.py) 和 [RayExecutor](../data_juicer/core/ray_executor.py) 封装了与Ray引擎的具体互操作,它们分别是基类 `DJDataset` 和 `BaseExecutor` 的子类,并且都支持 Ray [Tasks](https://docs.ray.io/en/latest/ray-core/tasks.html) 和 [Actors](https://docs.ray.io/en/latest/ray-core/actors.html) 。 +- 其中,去重算子是例外。它们在单机模式下很难规模化。因此我们提供了针对它们的 Ray 优化版本算子,并以特殊前缀开头:[`ray_xx_deduplicator`](../data_juicer/ops/deduplicator/) 。 + +### 数据子集分割 + +当在上万个节点中处理仅有若干个文件的数据集时, Ray 会根据可用资源分割数据集文件,并将它们分发到所有节点上,这可能带来极大的网络通信开销并减少 CPU 利用率。更多细节可以参考文档 [Ray's autodetect_parallelism](https://github.com/ray-project/ray/blob/2dbd08a46f7f08ea614d8dd20fd0bca5682a3078/python/ray/data/_internal/util.py#L201-L205) 和 [tuning output blocks for Ray](https://docs.ray.io/en/latest/data/performance-tips.html#tuning-output-blocks-for-read) 。 + +这种默认执行计划可能非常低效,尤其是在节点数量较多的情况下。为了优化此类情况的性能,我们考虑到 Ray 和 Arrow 的特性,提前将原始数据集自动拆分为较小的文件。当用户遇到此类性能问题时,他们可以利用此功能或根据偏好自己拆分数据集。在我们的自动拆分策略中,单个文件大小设置为 128MB,且结果应确保 拆分后的子文件数量 至少是 集群中可用CPU核心总数 的两倍。 + + +### JSON 文件的流式读取 + +为了解决 Ray Dataset 类底层框架 Arrow 对流式读取 JSON 数据的原生支持的缺失,我们开发了一个流式载入的接口并贡献到了一个针对 Apache Arrow 的内部 [补丁](https://github.com/modelscope/data-juicer/pull/515)( [相关 PR](https://github.com/apache/arrow/pull/45084) ) 。这个补丁可以缓解内存不够的问题。 + + +流式读取 JSON 文件是基础模型数据处理中的常见要求,因为许多数据集都以 JSONL 格式存储,并且尺寸巨大。 +但是,Ray Datasets 中当前的实现不支持流式读取 JSON 文件,根因来源于其底层 Arrow 库(截至 Ray 版本 2.40 和 Arrow 版本 18.1.0)。 + +为了解决不支持流式 JSON 数据的原生读取问题,我们开发了一个流式加载接口,并为 Apache Arrow 贡献了一个第三方 [补丁](https://github.com/modelscope/data-juicer/pull/515)([PR 到 repo](https://github.com/apache/arrow/pull/45084))。这将有助于缓解内存不足问题。使用此补丁后, Data-Juicer 的Ray模式将默认使用流式加载接口加载 JSON 文件。此外,如果输入变为 CSV 和 Parquet 文件,Ray模式下流式读取已经会自动开启。 + +### 去重 + +在 Ray 模式下,我们提供了一个优化过的基于 MinHash-LSH 的去重算子。我们使用 Ray Actors 实现了一个多进程的并查集和一个负载均衡的分布式算法 [BTS](https://ieeexplore.ieee.org/document/10598116) 来完成等价类合并操作。这个算子在 1,280 个CPU核上对 TB 大小级别的数据集去重只需要 3 个小时。我们的消融实验还表明相比于这个去重算子的初始实现版本,这些专门的优化项可以带来 2-3 倍的提速。 + +## 性能结果 + +### 不同数据规模的数据处理 + +我们在十亿样本规模的数据集上进行了实验。我们先准备了一个 56 万条样本的多模态数据集,并用不同的倍数(1-125,000倍)将其扩展来创建不同大小的数据集。下图的实验结果展示出了 Data-Juicer 的高扩展性。 + +![Overview](https://img.alicdn.com/imgextra/i3/O1CN01JV8wcC1oxn0G2xnBT_!!6000000005292-0-tps-1328-1742.jpg) + +### 大规模数据集分布式去重 + +我们在 200GB、1TB、5TB 的数据集上测试了我们的基于 MinHash 的 Ray 去重算子,测试机器的 CPU 核数从 640 核到 1280 核。如下表所示,当数据集大小增长 5 倍,处理时间增长 4.02 到 5.62 倍。当 CPU 核数翻倍,处理时间较原来减少了 58.9% 到 67.1%。 + +| CPU 核数 | 200GB 耗时 | 1TB 耗时 | 5TB 耗时 | +|---------|----------|----------|-----------| +| 4 * 160 | 11.13 分钟 | 50.83 分钟 | 285.43 分钟 | +| 8 * 160 | 7.47 分钟 | 30.08 分钟 | 168.10 分钟 | + +## 快速开始 + +在开始前,你应该安装 Data-Juicer 以及它的 `dist` 依赖需求: + +```shell +pip install -v -e . # 安装 Data-Juicer 的最小依赖需求 +pip install -v -e ".[dist]" # 包括 Ray 以及其他分布式相关的依赖库 +``` + +然后启动一个 Ray 集群(参考 [Ray 文档](https://docs.ray.io/en/latest/ray-core/starting-ray.html) ): + +```shell +# 启动一个集群并作为头节点 +ray start --head + +# (可选)在其他节点或机器上连接集群 +ray start --address='{head_ip}:6379' +``` + +我们在目录 `demos/process_on_ray/` 中准备了简单的例子,包括 2 个配置文件和 2 个测试数据集。 + +```text +demos/process_on_ray +├── configs +│ ├── demo.yaml +│ └── dedup.yaml +└── data + ├── demo-dataset.json + └── demo-dataset.jsonl +``` + +> [!Important] +> 如果你要在多个节点上运行这些例子,你需要将示例数据集放置与一个共享磁盘(如 NAS)上,并且将结果数据集导出到那里。你可以通过修改配置文件中的 `dataset_path` 和 `export_path` 参数来实现。 + +### 运行 Ray 模式样例 + +在配置文件 `demo.yaml` 中,我们将执行器类型设置为 "ray" 并且指定了自动的 Ray 地址。 + +```yaml +... +dataset_path: './demos/process_on_ray/data/demo-dataset.jsonl' +export_path: './outputs/demo/demo-processed' + +executor_type: 'ray' # 将执行器类型设置为 "ray" +ray_address: 'auto' # 设置为自动 Ray 地址 +... +``` + +运行这个例子,以使用 12 个常规算子处理测试数据集: + +```shell +# 从源码运行处理工具 +python tools/process_data.py --config demos/process_on_ray/configs/demo.yaml + +# 使用命令行工具 +dj-process --config demos/process_on_ray/configs/demo.yaml +``` + +Data-Juicer 会使用示例配置文件处理示例数据集,并将结果数据集导出到配置文件中 `export_path` 参数指定的目录中。 + +### 运行分布式去重样例 + +在配置文件 `dedup.yaml` 中,我们将执行器类型设置为 "ray" 并且指定了自动的 Ray 地址。我们使用了 MinHash 去重算子专门的分布式版本来对数据集去重。 + +```yaml +project_name: 'demo-dedup' +dataset_path: './demos/process_on_ray/data/' +export_path: './outputs/demo-dedup/demo-ray-bts-dedup-processed' + +executor_type: 'ray' # 将执行器类型设置为 "ray" +ray_address: 'auto' # 设置为自动 Ray 地址 + +# process schedule +# a list of several process operators with their arguments +process: + - ray_bts_minhash_deduplicator: # minhash 去重算子的分布式版本 + tokenization: 'character' +``` + +运行该实例来对数据集去重: + +```shell +# 从源码运行处理工具 +python tools/process_data.py --config demos/process_on_ray/configs/dedup.yaml + +# 使用命令行工具 +dj-process --config demos/process_on_ray/configs/dedup.yaml +``` + +Data-Juicer 会使用示例配置文件对示例数据集去重,并将结果数据集导出到配置文件中 `export_path` 参数指定的目录中。