diff --git a/_toc.yml b/_toc.yml index 45ae83e..fba8d6f 100644 --- a/_toc.yml +++ b/_toc.yml @@ -44,8 +44,8 @@ subtrees: - file: ch-ray-cluster/index entries: - file: ch-ray-cluster/ray-cluster - - file: ch-ray-cluster/ray-job - file: ch-ray-cluster/ray-resource + - file: ch-ray-cluster/ray-job - file: ch-ray-data/index entries: - file: ch-ray-data/ray-data-intro diff --git a/ch-dask-dataframe/map-partitions.ipynb b/ch-dask-dataframe/map-partitions.ipynb index 7be5ad1..616702e 100644 --- a/ch-dask-dataframe/map-partitions.ipynb +++ b/ch-dask-dataframe/map-partitions.ipynb @@ -280,9 +280,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Dask DataFrame 的某些 API 的计算模式是 Embarrassingly Parallel,它的底层很可能就是使用 `map_partitions()` 实现的。\n", + "Dask DataFrame 的某些 API 是 Embarrassingly Parallel 的,它的底层就是使用 `map_partitions()` 实现的。\n", "\n", - "{numref}`sec-dask-dataframe-indexing` 提到过,Dask DataFrame 会在某个列上进行切分,但如果对这些切分的列做了改动,需要 `clear_divisions()` 或者重新 `set_index()`。" + "{numref}`sec-dask-dataframe-indexing` 提到过,Dask DataFrame 会在某个列(索引列)上进行切分,但如果 `map_partitions()` 对这些索引列做了改动,需要 `clear_divisions()` 或者重新 `set_index()`。" ] }, { diff --git a/ch-dask-dataframe/read-write.ipynb b/ch-dask-dataframe/read-write.ipynb index 15ac679..e5dbdbc 100644 --- a/ch-dask-dataframe/read-write.ipynb +++ b/ch-dask-dataframe/read-write.ipynb @@ -1750,7 +1750,9 @@ "* 内嵌表模式\n", "* 数据压缩\n", "\n", - "具体而言,列式存储按照列进行存储,而不是 CSV 那样按行存储。数据分析时,我们可能只关心特定的列,而不是所有的列,因此在读取数据时,Parquet 允许很方便地过滤掉不需要的列,而不必读取整个行。因为减少了读取的数据量,Parquet 可以显著提高性能,也被广泛应用在 Apache Spark、Apache Hive 和 Apache Flink 等大数据生态。Parquet 自带了表模式,每个 Parquet 文件里嵌入了每个列的列名、数据类型等元数据,也就避免了 Dask DataFrame 进行表模式推测时推测不准确的问题。Parquet 中的数据是经过压缩的,相比 CSV,Parquet 更节省持久化存储的空间。\n", + "具体而言,列式存储按照列进行存储,而不是 CSV 那样按行存储。数据分析时,我们可能只关心特定的列,而不是所有的列,因此在读取数据时,Parquet 允许很方便地过滤掉不需要的列,而不必读取整个行,即列裁剪(Column Pruning)。除了列裁剪,行裁剪(Row Pruning)是另一种减少数据读取的技术。Parquet 自带了表模式,每个 Parquet 文件里嵌入了每个列的列名、数据类型等元数据,也就避免了 Dask DataFrame 进行表模式推测时推测不准确的问题。Parquet 中的数据是经过压缩的,相比 CSV,Parquet 更节省持久化存储的空间。\n", + "\n", + "Parquet 被广泛应用在 Apache Spark、Apache Hive 和 Apache Flink 等大数据生态。\n", "\n", "```{figure} ../img/ch-dask-dataframe/parquet.svg\n", "---\n", diff --git a/ch-dask-dataframe/shuffle.ipynb b/ch-dask-dataframe/shuffle.ipynb index 9456359..8ae1bdf 100644 --- a/ch-dask-dataframe/shuffle.ipynb +++ b/ch-dask-dataframe/shuffle.ipynb @@ -15,7 +15,7 @@ "\n", "为解决 Task Graph 过大的问题,Dask 设计了一种点对点(Peer-to-peer)的 Shuffle 机制。如 {numref}`fig-shuffle-tasks-p2p` 右侧所示,`p2p` 在 Task Graph 中引入了一个虚拟的障碍(Barrier)节点。Barrier 并不是一个真正的 Task,引入 Barrier 节点可以使 Task Graph 复杂度显著下降。\n", "\n", - "```{figure} ../img/ch-dask-dataframe/shuffle-tasks-p2p.png\n", + "```{figure} ../img/ch-dask-dataframe/shuffle-tasks-p2p.svg\n", "---\n", "width: 800px\n", "name: fig-shuffle-tasks-p2p\n", @@ -105,19 +105,19 @@ "source": [ "## 数据重分布\n", "\n", - "Dask 提供了三种数据重分布方法:`set_index`,`repartition` 和 `shuffle`,这三种都可能在全局层面对数据进行重分布。\n", + "Dask DataFrame 提供了三种数据重分布方法:`set_index()`,`repartition()` 和 `shuffle()`,这三种都可能在全局层面对数据进行重分布。\n", "\n", "```{table} Dask 三种数据重分布方法\n", ":name: tab-dask-repartition\n", "\n", "| 方法名 | 用途 | 是否修改索引 | 是否可以修改 Partition 数量 |\n", "|---\t|---\t|---\t|---\t|\n", - "| [`DataFrame.set_index`](https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.set_index.html) | 修改索引列,加速后续基于索引列的计算\t| 是 | 是\t|\n", - "| [`DataFrame.repartition`](https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.repartition.html) | 修改 Partition 数量,多用于数据倾斜场景 | 否\t| 是 |\n", - "| [`DataFrame.shuffle`](https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.shuffle.html) | 将相同的值归结到同一个 Partition | 否 | 是 |\n", + "| [`DataFrame.set_index()`](https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.set_index.html) | 修改索引列,加速后续基于索引列的计算\t| 是 | 是\t|\n", + "| [`DataFrame.repartition()`](https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.repartition.html) | 修改 Partition 数量,多用于数据倾斜场景 | 否\t| 是 |\n", + "| [`DataFrame.shuffle()`](https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.shuffle.html) | 将相同的值归结到同一个 Partition | 否 | 是 |\n", "```\n", "\n", - "在 {numref}`sec-dask-dataframe-indexing` 我们提过,`set_index` 将某字段设置为索引列,后续一系列计算非常依赖这个字段,`set_index` 能显著加速后续计算。`repartition` 主要解决数据倾斜的问题,即某些 Partiton 上的数据过大,过大的 Partition 有可能导致内存不足。" + "在 {numref}`sec-dask-dataframe-indexing` 我们提过,`set_index()` 将某字段设置为索引列,后续一系列计算非常依赖这个字段,`set_index()` 能显著加速后续计算。`repartition()` 主要解决数据倾斜的问题,即某些 Partiton 上的数据过大,过大的 Partition 有可能导致内存不足。" ] }, { @@ -140,11 +140,11 @@ "\n", "* 分组:按照 `by` 指定的分组字段进行分组,相同的分组字段被分到一起,这里涉及到大量 Shuffle 操作。\n", "* 组内聚合:组内聚合的 Shuffle 操作相对比较少。\n", - "* 组间聚合:组间聚合的 Shuffle 操作相对比较小。\n", + "* 组间聚合:组间聚合的 Shuffle 操作相对比较少。\n", "\n", "根据 Shuffle 操作的数量,不难得出结论:\n", "\n", - "* `groupby(by=indexed_columns).agg()` 和 `groupby(by=indexed_columns).apply(user_def_fn)` 性能最好。`indexed_columns` 指的是分组字段 `by` 在索引列({numref}`sec-dask-dataframe-indexing` 中 `set_index` 的列);`agg` 指的是 Dask DataFrame 提供的官方的 `sum`,`mean` 等聚合方法。因为 `indexed_columns` 是排过序的了,可以很快地对 `indexed_columns` 进行分组和数据分发。\n", + "* `groupby(by=indexed_columns).agg()` 和 `groupby(by=indexed_columns).apply(user_def_fn)` 性能最好。`indexed_columns` 指的是分组字段 `by` 是索引列({numref}`sec-dask-dataframe-indexing` 中 `set_index` 的列);`agg` 指的是 Dask DataFrame 提供的官方的 `sum`,`mean` 等聚合方法。因为 `indexed_columns` 是排过序的了,可以很快地对 `indexed_columns` 进行分组和数据分发。\n", "* `groupby(by=non_indexed_columns).agg()` 的数据交换量要更大一些,Dask 官方提供的 `agg` 方法做过一些优化。\n", "* `groupby(by=non_indexed_columns).apply(user_def_fn)` 的成本最高。它既要对所有数据进行交换,又要执行用户自定义的函数,用户自定义函数的效率比 Dask 官方的低。" ] diff --git a/ch-dask-ml/distributed-training.ipynb b/ch-dask-ml/distributed-training.ipynb index a8287d6..7c22127 100644 --- a/ch-dask-ml/distributed-training.ipynb +++ b/ch-dask-ml/distributed-training.ipynb @@ -438,7 +438,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "调用 `fit` 方法(与 scikit-learn 类似):" + "调用 `fit()` 方法(与 scikit-learn 类似):" ] }, { @@ -2118,9 +2118,9 @@ "\n", "XGBoost 和 LightGBM 是两种决策树模型的实现,他们本身就对分布式训练友好,且集成了 Dask 的分布式能力。下面以 XGBoost 为例,介绍 XGBoost 如何基于 Dask 实现分布式训练,LightGBM 与之类似。\n", "\n", - "在 XGBoost 中,训练一个模型既可以使用 `train` 方法,也可以使用 scikit-learn 式的 `fit()` 方法。这两种方式都支持 Dask 分布式训练。\n", + "在 XGBoost 中,训练一个模型既可以使用 `train()` 方法,也可以使用 scikit-learn 式的 `fit()` 方法。这两种方式都支持 Dask 分布式训练。\n", "\n", - "下面的代码对单机的 XGBoost 和 Dask 分布式训练两种方式进行了性能对比。如果使用 Dask,需要将 [`xgboost.DMatrix`](https://xgboost.readthedocs.io/en/stable/python/python_api.html#xgboost.DMatrix) 修改为 [`xgboost.dask.DaskDMatrix`](https://xgboost.readthedocs.io/en/stable/python/python_api.html#xgboost.dask.DaskDMatrix),`xgboost.dask.DaskDMatrix` 可以将分布式的 Dask Array 或 Dask DataFrame 转化为 XGBoost 所需要的数据格式;再将 [`xgboost.train`](https://xgboost.readthedocs.io/en/stable/python/python_api.html#xgboost.train) 修改为 [`xgboost.dask.train`](https://xgboost.readthedocs.io/en/stable/python/python_api.html#xgboost.dask.train);并传入 Dask 集群客户端 `client`。" + "下面的代码对单机的 XGBoost 和 Dask 分布式训练两种方式进行了性能对比。如果使用 Dask,用户需要将 [`xgboost.DMatrix`](https://xgboost.readthedocs.io/en/stable/python/python_api.html#xgboost.DMatrix) 修改为 [`xgboost.dask.DaskDMatrix`](https://xgboost.readthedocs.io/en/stable/python/python_api.html#xgboost.dask.DaskDMatrix),`xgboost.dask.DaskDMatrix` 可以将分布式的 Dask Array 或 Dask DataFrame 转化为 XGBoost 所需要的数据格式;用户还需要将 [`xgboost.train()`](https://xgboost.readthedocs.io/en/stable/python/python_api.html#xgboost.train) 修改为 [`xgboost.dask.train()`](https://xgboost.readthedocs.io/en/stable/python/python_api.html#xgboost.dask.train);并传入 Dask 集群客户端 `client`。" ] }, { diff --git a/ch-dask-ml/hyperparameter.ipynb b/ch-dask-ml/hyperparameter.ipynb index 2ad541c..4ba4b87 100644 --- a/ch-dask-ml/hyperparameter.ipynb +++ b/ch-dask-ml/hyperparameter.ipynb @@ -12,6 +12,8 @@ "* 基于 scikit-learn 的 joblib 后端,将多个超参数调优任务分布到 Dask 集群\n", "* 使用 Dask-ML 提供的超参数调优 API\n", "\n", + "这两种方式都是针对训练数据量可放到单机内存中的场景。\n", + "\n", "## scikit-learn joblib\n", "\n", "单机的 scikit-learn 已经提供了丰富易用的模型训练和超参数调优接口,它默认使用 joblib 在单机多核之间并行。像随机搜索和网格搜索等超参数调优任务容易并行,任务之间没有依赖关系,很容易并行起来。\n", @@ -185,7 +187,7 @@ "\n", "Dask-ML 的超参数调优算法要求输入为 Dask DataFrame 或 Dask Array 等可被切分的数据,而非 pandas DataFrame,因此数据预处理部分需要改为 Dask。\n", "\n", - "值得注意的是,Dask-ML 提供的 `SuccessiveHalvingSearchCV` 和 `HyperbandSearchCV` 等算法要求模型必须支持 `partial_fit()` 和 `score()`。`partial_fit()` 是 scikit-learn 中迭代式算法(比如梯度下降法)的一次迭代过程。连续减半算法和 Hyperband 算法先分配一些算力额度,不是完成试验的所有迭代,而只做一定次数的迭代(对 `partial_fit()` 进行一定次数的调用),评估性能(在验证集上调用 `score()` 方法),淘汰性能较差的试验。" + "值得注意的是,Dask-ML 提供的 `SuccessiveHalvingSearchCV` 和 `HyperbandSearchCV` 等算法要求模型必须支持 `partial_fit()` 和 `score()`。`partial_fit()` 是 scikit-learn 中迭代式算法(比如梯度下降法)的一次迭代过程。连续减半算法和 Hyperband 算法先分配一些算力额度,不是完成试验的所有迭代,而只做一定次数的迭代(对 `partial_fit()` 调用有限次数),评估性能(在验证集上调用 `score()` 方法),淘汰性能较差的试验。" ] }, { @@ -775,7 +777,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "本书还会介绍 Ray 的超参数调优,相比 Dask,Ray 的兼容性和功能完善程度更好,读者可以根据自身需求选择适合自己的框架。" + "本书还会介绍 Ray 的超参数调优,相比 Dask,Ray 在超参数调优上的兼容性和功能完善程度更好,读者可以根据自身需求选择适合自己的框架。" ] } ], diff --git a/ch-dask/dask-distributed.ipynb b/ch-dask/dask-distributed.ipynb index 6ae93e0..dae00ae 100644 --- a/ch-dask/dask-distributed.ipynb +++ b/ch-dask/dask-distributed.ipynb @@ -490,7 +490,7 @@ "\n", "## Dask Nanny\n", "\n", - "Dask 在启动集群时,除了启动 Dask Scheduler 和 Dask Worker 外,还启动了一个叫做 Dask Nanny 的监控服务。Nanny 英文意为保姆,Dask Nanny 就是在执行保姆的作用,它监控着 Dask Worker 的 CPU 和内存使用情况,避免 Dask Worker 超出资源上限;Dask Worker 宕机时重启 Dask Worker。如果某个 Dask Worker 被 Dask Nanny 重启了,该 Dask Worker 上的计算任务被重新执行,其他 Dask Worker 一直等待该 Dask Worker 恢复到刚刚宕机的时间点;但这会给其他 Dask Worker 增加很多负担。" + "Dask 在启动集群时,除了启动 Dask Scheduler 和 Dask Worker 外,还启动了一个叫做 Dask Nanny 的监控服务。Nanny 英文意为保姆,Dask Nanny 就是在执行保姆的作用,它监控着 Dask Worker 的 CPU 和内存使用情况,避免 Dask Worker 超出资源上限;Dask Worker 宕机时,Dask Nanny 负责重启 Dask Worker。如果某个 Dask Worker 被 Dask Nanny 重启了,该 Dask Worker 上的计算任务被重新执行,其他 Dask Worker 保留当时状态和数据,并一直等待该 Dask Worker 恢复到刚刚宕机的时间点;这会给其他 Dask Worker 增加很多负担。如果 Dask Worker 频繁重启,可能需要考虑用 `rechunk()` 或 `repartition()` 调整 Partition 的数据大小。" ] }, { diff --git a/ch-dask/task-graph-partitioning.ipynb b/ch-dask/task-graph-partitioning.ipynb index 63b988e..e412e2a 100644 --- a/ch-dask/task-graph-partitioning.ipynb +++ b/ch-dask/task-graph-partitioning.ipynb @@ -86,7 +86,7 @@ "\n", "## 数据切分\n", "\n", - "Dask 将大数据切分成可被很多个小数据,Dask Array 将切分的小数据称为块(Chunk);Dask DataFrame 将切分的小数据称为分区(Partition)。虽然 Chunk 和 Partition 名词不同,但本质上都是数据的切分。\n", + "Dask 将大数据切分成很多个小数据,Dask Array 将切分的小数据称为块(Chunk);Dask DataFrame 将切分的小数据称为分区(Partition)。虽然 Chunk 和 Partition 名词不同,但本质上都是数据的切分。\n", "\n", "下面的例子模拟了这样的计算:一个 $10 \\times 10$ 矩阵切分为 4 个 $5 \\times 5$ 矩阵。" ] @@ -522,13 +522,13 @@ "\n", "数据块过大,则 Dask Worker 很容易内存耗尽(Out of Memory,OOM),因为所切分的数据块无法被单个 Dask Worker 所处理。Dask 遇到 OOM 时,会将部分数据卸载到(Spill)硬盘,如果 Spill 之后仍无法完成计算,Dask Worker 进程可能被重启,甚至反复重启。\n", "\n", - "### 迭代式算法\n", + "## 迭代式算法\n", "\n", "迭代式算法通常会使用循环,循环的当前迭代依赖之前迭代的数据。Dask 的 Task Graph 对于这类迭代式算法处理得并不好,每个数据依赖都会在 Task Graph 中增加有向边,进而会使得 Task Graph 非常庞大,导致执行效率很低。比如,很多机器学习算法、SQL JOIN 都是基于循环的迭代式算法,用户需要对这些操作有心理准备。\n", "\n", "## 设置正确的数据块大小\n", "\n", - "总之,在做数据块切分时,不应过大,也不应过小。Dask 没有一个简单通用的设置原则,需要开发者根据自身数据的情况和 Dask 的仪表盘或日志来不断调整。Dask Array 中使用 `rechunk(chunks=...)` 设置数据块大小,`chunks` 参数可以是 `int` 表示一个切分成多少个数据块,也可以是 `(5, 10, 20)` 这样的 `tuple`,表示单个数据块的维度大小。Dask DataFrame 中使用 `repartition(npartitions=...)` 设置数据块大小。 \n", + "总之,在做数据块切分时,不应过大,也不应过小。Dask 没有一个简单通用的设置原则,需要开发者根据自身数据的情况和 Dask 的仪表盘或日志来不断调整。\n", "\n", "### 仪表盘\n", "\n", @@ -603,7 +603,7 @@ "\n", "Dask Array 和 Dask DataFrame 都提供了设置数据块的方式。\n", "\n", - "可以在初始化时就设定每个数据块的大小,比如 `x = da.ones((10, 10), chunks=(5, 5))`,`chunks` 参数用来设置每个数据块大小。也可以在程序运行过程中调整,比如 Dask Array 的 [`rechunk()`](https://docs.dask.org/en/latest/generated/dask.array.rechunk.html) 和 Dask DataFrame 的 [`repartition()`](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.repartition.html)。\n" + "可以在初始化时就设定每个数据块的大小,比如 `x = da.ones((10, 10), chunks=(5, 5))`,`chunks` 参数用来设置每个数据块大小。也可以在程序运行过程中调整,比如 Dask Array 的 [`rechunk()`](https://docs.dask.org/en/latest/generated/dask.array.rechunk.html) 和 Dask DataFrame 的 [`repartition()`](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.repartition.html)。Dask Array 的 `rechunk(chunks=...)` 在程序运行过程中调整数据块大小,`chunks` 参数可以是 `int` 表示切分成多少个数据块,也可以是 `(5, 10, 20)` 这样的 `tuple`,表示单个矩阵的维度大小;Dask DataFrame 的 `repartition(npartitions=...)` 将数据切分成多少个 Partition。 \n" ] }, { diff --git a/ch-data-science/hyperparameter.md b/ch-data-science/hyperparameter.md index 9b9fb33..fca03d8 100644 --- a/ch-data-science/hyperparameter.md +++ b/ch-data-science/hyperparameter.md @@ -11,12 +11,6 @@ 确定这些超参数的方式是开启多个试验(Trial),每个试验测试超参数的某个值,根据模型训练结果的好坏来做选择,这个过程称为超参数调优。寻找最优超参数的过程这个过程可以手动进行,手动费时费力,效率低下,所以业界提出一些自动化的方法。常见的自动化的搜索方法有如下几种,{numref}`fig-tune-algorithms` 展示了在二维搜索空间中进行超参数搜索,每个点表示一种超参数组合,颜色越暖,表示性能越好。迭代式的算法从初始点开始,后续试验依赖之前试验的结果,最后向性能较好的方向收敛。 -* 网格搜索(Grid Search):网格搜索是一种穷举搜索方法,它通过遍历所有可能的超参数组合来寻找最优解,这些组合会逐一被用来训练和评估模型。网格搜索简单直观,但当超参数空间很大时,所需的计算成本会急剧增加。 -* 随机搜索(Random Search):随机搜索不是遍历所有可能的组合,而是在解空间中随机选择超参数组合进行评估。这种方法的效率通常高于网格搜索,因为它不需要评估所有可能的组合,而是通过随机抽样来探索参数空间。随机搜索尤其适用于超参数空间非常大或维度很高的情况下,它可以在较少的尝试中发现性能良好的超参数配置。然而,由于随机性的存在,随机搜索可能会错过一些局部最优解,因此可能需要更多的尝试次数来确保找到一个好的解。 -* 贝叶斯优化(Bayesian Optimization):贝叶斯优化是一种**迭代式**超参数搜索技术。它基于贝叶斯定理的技术,它利用概率模型来指导搜索最优超参数的过程。这种方法的核心思想是构建一个贝叶斯模型,通常是高斯过程(Gaussian Process),来近似评估目标函数的未知部分。贝叶斯优化能够在有限的评估次数内,智能地选择最有希望的超参数组合进行尝试,特别适用于计算成本高昂的场景。 - -超参数调优是一种黑盒优化,所谓黑盒优化,指的是目标函数是一个黑盒,我们只能通过观察其输入和输出来推断其行为。黑盒的概念比较难以理解,但是我们可以相比梯度下降算法,梯度下降算法**不是**一种黑盒优化算法,我们可以得到目标函数的梯度(或近似值),并用梯度来指导搜索方向,最终找到目标函数的(局部)最优解。黑盒优化算法一般无法找到目标函数的数学表达式和梯度,也无法使用基于梯度的优化技术。贝叶斯优化、遗传算法、模拟退火等都是黑盒优化,这些算法通常在超参数搜索空间中选择一些候选解,运行目标函数,得到超参数组合的实际性能,基于实际性能,不断迭代调整,即重复上述过程,直到满足条件。 - ```{figure} ../img/ch-data-science/tune-algorithms.svg --- width: 800px @@ -25,6 +19,12 @@ name: fig-tune-algorithms 在一个二维搜索空间中进行超参数搜索,每个点表示一种超参数组合,暖色表示性能较好,冷色表示性能较差。 ``` +* 网格搜索(Grid Search):网格搜索是一种穷举搜索方法,它通过遍历所有可能的超参数组合来寻找最优解,这些组合会逐一被用来训练和评估模型。网格搜索简单直观,但当超参数空间很大时,所需的计算成本会急剧增加。 +* 随机搜索(Random Search):随机搜索不是遍历所有可能的组合,而是在解空间中随机选择超参数组合进行评估。这种方法的效率通常高于网格搜索,因为它不需要评估所有可能的组合,而是通过随机抽样来探索参数空间。随机搜索尤其适用于超参数空间非常大或维度很高的情况,它可以在较少的尝试中发现性能良好的超参数配置。然而,由于随机性的存在,随机搜索可能会错过一些局部最优解,因此可能需要更多的尝试次数来确保找到一个好的解。 +* 贝叶斯优化(Bayesian Optimization):贝叶斯优化是一种**迭代式**超参数搜索技术。它基于贝叶斯定理的技术,它利用概率模型来指导搜索最优超参数的过程。这种方法的核心思想是构建一个贝叶斯模型,通常是高斯过程(Gaussian Process),来近似评估目标函数的未知部分。贝叶斯优化能够在有限的评估次数内,智能地选择最有希望的超参数组合进行尝试,特别适用于计算成本高昂的场景。 + +超参数调优是一种黑盒优化,所谓黑盒优化,指的是目标函数是一个黑盒,我们只能通过观察其输入和输出来推断其行为。黑盒的概念比较难以理解,但是我们可以相比梯度下降算法,梯度下降算法**不是**一种黑盒优化算法,我们可以得到目标函数的梯度(或近似值),并用梯度来指导搜索方向,最终找到目标函数的(局部)最优解。黑盒优化算法一般无法找到目标函数的数学表达式和梯度,也无法使用基于梯度的优化技术。贝叶斯优化、遗传算法、模拟退火等都是黑盒优化,这些算法通常在超参数搜索空间中选择一些候选解,运行目标函数,得到超参数组合的实际性能,基于实际性能,不断迭代调整,即重复上述过程,直到满足条件。 + ### 贝叶斯优化 贝叶斯优化基于贝叶斯定理,这里不深入探讨详细的数学公式。简单来说,它需要先掌握搜索空间中几个观测样本点(Observation)的实际性能,构建概率模型,描述每个超参数在每个取值点上模型性能指标的**均值**和**方差**。其中,均值代表这个点最终的期望效果,均值越大表示模型最终性能指标越大,方差表示这个点的不确定性,方差越大表示这个点不确定,值得去探索。{numref}`fig-bayesian-optimization-explained` 在一个 1 维超参数搜索空间中迭代 3 步的过程,虚线是目标函数的真实值,实线是预测值(或者叫后验概率分布均值),实线上下的蓝色区域为置信区间。贝叶斯优化利用了高斯回归过程,即目标函数是由一系列观测样本点所构成的随机过程,通过高斯概率模型来描述这个随机过程的概率分布。贝叶斯优化通过不断地收集观测样本点来更新目标函数的后验分布,直到后验分布基本贴合真实分布。对应 {numref}`fig-bayesian-optimization-explained` 中,进行迭代 3 之前只有两个观测样本点,经过迭代 3 和迭代 4 之后中增加了新的观测样本点,这几个样本点附近的预测值逐渐接近真实值。 diff --git a/ch-ray-cluster/ray-cluster.md b/ch-ray-cluster/ray-cluster.md index 5e20eb3..016b3bc 100644 --- a/ch-ray-cluster/ray-cluster.md +++ b/ch-ray-cluster/ray-cluster.md @@ -3,14 +3,14 @@ ## Ray 集群 -如 {numref}`fig-ray-cluster` 所示,Ray 集群由一系列计算节点组成,其中两类关键的节点:头节点(Head node)和工作节点(Worker node)。这些节点可以部署在虚拟机、容器或者是裸金属服务器上。 +如 {numref}`fig-ray-cluster` 所示,Ray 集群由一系列计算节点组成,其中两类关键的节点:头节点(Head)和工作节点(Worker)。这些节点可以部署在虚拟机、容器或者是裸金属服务器上。 ```{figure} ../img/ch-ray-cluster/ray-cluster.svg --- width: 800px name: fig-ray-cluster --- -Ray 集群由头节点和多个工作节点组成,头节点上运行着一些管理类的进程。 +Ray 集群由头节点和多个工作节点组成,头节点上运行着一些管理进程。 ``` 所有节点上都运行着一些进程: @@ -43,7 +43,7 @@ Ray 的头节点还运行着其他一些管理类的服务,比如计算资源 ray start --head --port=6379 ``` -它会在该物理节点启动一个头节点进程,默认端口号是 6379,也可以用 `--port` 来指定端口号。执行完上述命令后,命令行会有一些提示,包括当前节点的地址,如何关停,如何启动其他工作节点: +它会在该物理节点启动一个头节点进程,默认端口号是 6379,也可以用 `--port` 来指定端口号。执行完上述命令后,命令行会有一些提示,包括当前节点的地址,如何关停。启动工作节点: ```bash ray start --address=: diff --git a/ch-ray-cluster/ray-job.md b/ch-ray-cluster/ray-job.md index 9357933..ab022a0 100644 --- a/ch-ray-cluster/ray-job.md +++ b/ch-ray-cluster/ray-job.md @@ -1,7 +1,7 @@ (sec-ray-job)= # Ray 作业 -部署好一个 Ray 集群后,我们就可以向集群上提交作业了。Ray 作业指的是用户编写的,基于 Task、Actor 或者 Ray 各类生态(Ray Train、Ray Tune、Ray Serve、RLlib 等)的具体的计算任务。Ray 集群提供了多租户的服务,可以同时运行多个用户多种不同类型的计算任务。由于 Ray 集群提供多租户服务的特点,不同的 Ray 作业的源代码、配置文件和软件包环境不一样,因此,在提交作业时除了需要指定当前作业的 `__main__` 函数的入口外,还需要: +部署好一个 Ray 集群后,我们就可以向集群上提交作业了。Ray 作业指的是用户编写的,基于 Task、Actor 或者 Ray 各类生态(Ray Train、Ray Tune、Ray Serve、RLlib 等)的具体的计算任务。Ray 集群正在尝试提供多租户的服务,可以同时运行多个用户多种不同类型的计算任务。由于 Ray 集群提供多租户服务的特点,不同的 Ray 作业的源代码、配置文件和软件包环境不一样,因此,在提交作业时除了需要指定当前作业的 `__main__` 函数的入口外,还需要: * 工作目录:这个作业所需要的 Python 源代码和配置文件 * 软件环境:这个作业所依赖的 Python 软件包和环境变量 @@ -110,12 +110,12 @@ ray.get(gpu_task.remote()) 调用 Actor 和 Task 之前,Ray 分配了一个 GPU 给程序的入口。调用 Actor 和 Task 之后,又分别给 `gpu_actor` 和 `gpu_task` 分配了 1 个 GPU。 :::{note} -将提交作业到一个已有的 Ray 集群上,`ray.init()` 中不能设置 `num_cpus` 和 `num_gpus` 参数。 +提交作业到一个已有的 Ray 集群上时,`ray.init()` 中不能设置 `num_cpus` 和 `num_gpus` 参数。 ::: ### 依赖管理 -Ray 集群是多租户的,上面可能运行着不同的用户的作业,不同作业对 Python 各个依赖的版本要求不同,Ray 提供了运行时环境的功能,比如在启动这个作业时,设置 `--runtime-env-json`,他是一个 JSON,包括:需要 `pip` 安装的 Python 包,或环境变量(`env_vars`),或工作目录(`working_dir`)。Ray 集群的运行时环境大概原理是为每个作业创建一个独立的虚拟环境([virtualenv](https://virtualenv.pypa.io/))。 +Ray 集群是多租户的,上面可能运行着不同用户的作业,不同作业对 Python 各个依赖的版本要求不同,Ray 提供了运行时环境的功能,比如在启动这个作业时,设置 `--runtime-env-json`,他是一个 JSON,包括:需要 `pip` 安装的 Python 包,或环境变量(`env_vars`),或工作目录(`working_dir`)。Ray 集群的运行时环境大概原理是为每个作业创建一个独立的虚拟环境([virtualenv](https://virtualenv.pypa.io/))。 ```json { diff --git a/ch-ray-cluster/ray-resource.md b/ch-ray-cluster/ray-resource.md index 9401359..bcd8742 100644 --- a/ch-ray-cluster/ray-resource.md +++ b/ch-ray-cluster/ray-resource.md @@ -3,10 +3,10 @@ ## 计算资源 -Ray 可以管理计算资源,包括 CPU、内存和 GPU 等各类加速器。这里的计算资源是逻辑上的,逻辑资源与物理上的计算资源相对应。Ray 集群的各个节点启动时会探测物理计算资源,并根据一定规则映射为逻辑上的计算资源: +Ray 可以管理计算资源,包括 CPU、内存和 GPU 等各类加速器。这里的计算资源是逻辑上的,逻辑资源与物理上的计算资源相对应。Ray 集群的各个节点启动时会探测物理计算资源,并根据一定规则映射为逻辑上的计算资源。Ray 集群的各个节点可以是虚拟机、容器或者裸金属服务器。 -* CPU:每个节点(容器)中的物理 CPU 个数(`num_cpus`) -* GPU:每个节点(容器)中的物理 GPU 个数(`num_gpus`) +* CPU:每个节点中的物理 CPU 个数(`num_cpus`) +* GPU:每个节点中的物理 GPU 个数(`num_gpus`) * 内存:每个节点可用内存的 70%(`memory`) 以上为默认的规则。也可以在启动 Ray 集群时,手动指定这些资源。比如某台物理节点上有 64 个 CPU 核心,8 个 GPU,启动 Ray 工作节点时只注册一部分计算资源。 @@ -37,7 +37,7 @@ func.options(num_cpus=4).remote() ## 其他资源 -除了通用的 CPU、GPU 外,Ray 也支持很多其他各类计算资源,比如各类加速器。可以使用 `--resources={"special_hardware": 1}` 这样的键值对来管理这些计算资源。使用方式与 `num_gpus` 管理 GPU 资源相似。比如 Google 的 TPU (比如:`resources={"TPU": 2}`)和华为的昇腾(`resources={"NPU": 2}`)。或者某集群得 CPU 既有 x86 架构,也有 ARM 架构,也可以这样定义:`resources={"arm64": 1}`。 +除了通用的 CPU、GPU 外,Ray 也支持很多其他类型计算资源,比如各类加速器。可以使用 `--resources={"special_hardware": 1}` 这样的键值对来管理这些计算资源。使用方式与 `num_gpus` 管理 GPU 资源相似。比如 Google 的 TPU:`resources={"TPU": 2}`和华为的昇腾:`resources={"NPU": 2}`。某集群 CPU 既有 x86 架构,也有 ARM 架构,对于 ARM 的节点可以这样定义:`resources={"arm64": 1}`。 ## 自动缩放 @@ -46,19 +46,19 @@ Ray 集群可以自动缩放,主要面向以下场景: * 当 Ray 集群的资源不够时,创建新的工作节点。 * 当某个工作节点闲置或者无法启动,将该工作节点关闭。 -自动缩放主要满足 Task 或 Actor 中对计算资源需求,而不是根据计算节点的资源利用情况自动缩放。 +自动缩放主要满足 Task 或 Actor 代码中定义的计算资源请求(比如,`task.options()` 请求的计算资源),而不是根据计算节点的资源实际利用情况自动缩放。 ## Placement Group -基于计算资源和集群,Ray 提供了 Placement Group,中文可以理解成资源组。Placement Groups 允许用户**原子地**使用集群上多个节点的计算资源,所谓原子地(Atomically),是指这些资源或者都分配给该用户,或者完全不分配,不会出现只分配一部分的情况。 +基于计算资源和集群,Ray 提供了 Placement Group,中文可以理解成资源组。Placement Group 允许用户**原子地**使用集群上多个节点的计算资源,所谓原子地(Atomically),是指这些资源或者都分配给该用户,或者完全不分配,不会出现只分配一部分的情况。 -Placement Groups 主要针对的场景案例有: +Placement Group 主要针对的场景案例有: * 一个作业需要一组资源,这些资源需要协同工作以完成任务,给这个作业分配一部分资源,无法完成任务。这种场景在集群调度中又被称为组调度(Gang Scheduling)。比如,大规模分布式训练中需要多台计算节点和多块 GPU,需要在 Ray 集群中申请并分配这些资源。 -* 作业需要在多个节点上负载均衡,每个节点承担一小部分任务。Placement Groups 使得这个作业尽量分摊到多个计算节点上。比如,在一个分布式推理场景,一个作业需要 8 块 GPU,每个 GPU 加载模型,独立地进行推理。为了负载均衡,应该将作业调度到 8 个计算节点上,每个节点占用 1 块 GPU;而不是将这个作业调度到 1 个计算节点的 8 块 GPU 上。因为都调度到 1 个计算节点,节点故障后,整个推理服务不可用。 +* 作业需要在多个节点上负载均衡,每个节点承担一小部分任务。Placement Group 使得这个作业尽量分摊到多个计算节点上。比如,在一个分布式推理场景,一个作业需要 8 块 GPU,每个 GPU 加载模型,独立地进行推理。为了负载均衡,应该将作业调度到 8 个计算节点上,每个节点占用 1 块 GPU;而不是将这个作业调度到 1 个计算节点的 8 块 GPU 上。因为都调度到 1 个计算节点,节点故障后,整个推理服务不可用。 -Placement Groups 有几个关键概念: +Placement Group 有几个关键概念: * 资源包(Bundle):Bundle 一个键值对,用来定义所需的计算资源,比如 `{"CPU": 2}`,或 `{"CPU": 8, "GPU": 4}`。一个 Bundle 必须可以调度到单个计算节点;比如,一个计算节点只有 8 块 GPU,`{"GPU": 10}` 这样的 Bundle 是不合理的。 * 资源组(Placement Group):Placement Group 是一组 Bundle。比如,`{"CPU": 8} * 4` 会向 Ray 集群申请 4 个 Bundle,每个 Bundle 预留 8 个 CPU。多个 Bundle 的调度会遵循一些调度策略。Placement Group 被 Ray 集群创建后,可被用来运行 Ray Task 和 Ray Actor。 @@ -108,7 +108,7 @@ remove_placement_group(pg) 创建 Placement Group 的 `placement_group()` 方法还接收 `strategy` 参数,用来设定不同的调度策略:或者是让这些预留资源尽量集中到少数计算节点上,或者是让这些预留资源尽量分散到多个计算节点。共有如下策略: * `STRICT_PACK`:所有 Bundle 都必须调度到单个计算节点。 -* `PACK`:所有 Bundle 优先调度到单个计算节点,如果无法满足条件,再调度到其他计算节点,如 {numref}`fig-ray-pg-pack` 所示。如果不做设置,是默认的调度策略。 +* `PACK`:所有 Bundle 优先调度到单个计算节点,如果无法满足条件,再调度到其他计算节点,如 {numref}`fig-ray-pg-pack` 所示。`PACK` 是默认的调度策略。 * `STRICT_SPREAD`:每个 Bundle 必须调度到不同的计算节点。 * `SPREAD`:每个 Bundle 优先调度到不同的计算节点,如果无法满足条件,有些 Bundle 可以共用一个计算节点,如 {numref}`fig-ray-pg-spread` 所示。 @@ -120,7 +120,7 @@ name: fig-ray-pg-pack `PACK` 策略优先将所有 Bundle 调度到单个计算节点。 ``` -由于计算尽量调度到了少数计算节点,`STRICT_PACK` 和 `PACK` 的调度策略保证了数据的局部性(Data Locality),计算任务可以快速访问本地的数据。 +由于计算尽量调度到了少数计算节点,`STRICT_PACK` 和 `PACK` 的调度策略保证了数据的局部性(Locality),计算任务可以快速访问本地的数据。 ```{figure} ../img/ch-ray-cluster/pg-spread.svg --- diff --git a/ch-ray-core/ray-intro.md b/ch-ray-core/ray-intro.md index c6dcb08..a79a78b 100644 --- a/ch-ray-core/ray-intro.md +++ b/ch-ray-core/ray-intro.md @@ -1,7 +1,7 @@ # Ray 简介 Ray 是一个可扩展的计算框架。它最初为强化学习设计,之后逐渐演变成一个面向数据科学和人工智能的框架。 -如 {numref}`fig-ray-ecosystem` 所示,当前 Ray 主要由底层的 Ray Core 和上层的各类 Ray AIR (Artificial Intelligence Runtime) 生态组成:Ray Core 是一系列底层 API, 可以将 Python 函数或者 Python 类等计算任务横向扩展到多个计算节点上;在 Ray Core 之上,Ray 封装了一些面向数据科学和人工智能的库(Ray AIR),可以进行数据的处理(Ray Data)、模型训练(Ray Train)、模型的超参数调优(Ray Tune),模型推理服务(Ray Serve),强化学习(RLib)等。 +如 {numref}`fig-ray-ecosystem` 所示,当前 Ray 主要由底层的 Ray Core 和上层的各类 Ray AI (Artificial Intelligence) 生态组成:Ray Core 是一系列底层 API, 可以将 Python 函数或者 Python 类等计算任务横向扩展到多个计算节点上;在 Ray Core 之上,Ray 封装了一些面向数据科学和人工智能的库(Ray AI Libraries),可以进行数据的处理(Ray Data)、模型训练(Ray Train)、模型的超参数调优(Ray Tune),模型推理服务(Ray Serve),强化学习(RLib)等。 ```{figure} ../img/ch-ray-core/ray.svg --- diff --git a/ch-ray-core/remote-class.ipynb b/ch-ray-core/remote-class.ipynb index ab318bb..9d0f596 100644 --- a/ch-ray-core/remote-class.ipynb +++ b/ch-ray-core/remote-class.ipynb @@ -212,7 +212,7 @@ "origin_pos": 8 }, "source": [ - "我们可以用同一个类创建不同的 Actor 实例,不同 Actor 之间的成员函数调用可以被并行化执行,但同一个 Actor 的成员函数调用是顺序执行的。" + "我们可以用同一个类创建不同的 Actor 实例,不同 Actor 实例的成员函数调用可以被并行化执行,但同一个 Actor 的成员函数调用是顺序执行的。" ] }, { @@ -257,7 +257,7 @@ "origin_pos": 10 }, "source": [ - "同一个 Actor 实例是互相共享状态的,所谓共享状态是指,Actor 可能被分布式地调度,无论调度到哪个计算节点,对 Actor 实例的任何操作都像对单机 Python 类和实例的操作一样,对象实例的成员变量的数据是可被访问、修改以及实时更新的。\n" + "同一个 Actor 实例是互相共享状态的,所谓共享状态是指,Actor 可能被分布式地调度,无论调度到哪个计算节点,对 Actor 实例的任何操作都像对单机 Python 类和实例的操作一样,对象实例的成员变量是可被访问、修改以及实时更新的。" ] }, { @@ -301,7 +301,7 @@ "source": [ "## Actor 编程模型\n", "\n", - "Actor 编程模型是一种分布式编程的范式,每门编程语言或框架有自己的实现。Actor 编程模型的基本要素是 Actor 实例,即每个 Actor 对象都是唯一的。我们可以把单个 Actor 实例理解成单个带地址信息的进程。每个 Actor 都拥有地址信息,我们就可以从别的 Actor 向这个 Actor 发送信息,就像我们通过手机号或电子邮件地址互相发送信息一样。一个 Actor 可以有一个地址,也可以有多个地址,多个 Actor 可以共享同一个地址,拥有多少个地址主要取决于我们想以怎样的方式收发数据。多个 Actor 共享同一个地址,就像公司里有一个群组邮箱,群组包含了多个人,有个对外的公共的地址,向这个群组发邮件,群组中的每个人都可以收到消息。\n", + "Actor 编程模型是一种分布式编程的范式,每门编程语言或框架有自己的实现。Actor 编程模型的基本要素是 Actor 实例,即每个 Actor 对象都是唯一的。我们可以把单个 Actor 实例理解成单个带地址信息的进程。每个 Actor 都拥有地址信息,我们就可以从别的 Actor 向这个 Actor 发送信息,就像我们通过手机号或电子邮件地址互相发送信息一样。一个 Actor 可以有一个地址,也可以有多个地址,多个 Actor 可以共享同一个地址,拥有多少个地址主要取决于我们想以怎样的方式收发数据。多个 Actor 共享同一个地址,就像公司里有一个群组邮箱,群组包含了多个人,有个对外的公共地址,向这个群组发邮件,群组中的每个人都可以收到消息。\n", "\n", "拥有地址和内存空间,Actor 可以做以下事情:\n", "\n", @@ -312,7 +312,7 @@ "\n", "Actor 存储的状态数据只能由 Actor 自己来管理,不能被其他 Actor 修改。这有点像面向对象编程语言中类的实例,如果想修改实例的数据,一般通过实例的成员函数。如果我们想修改 Actor 里面存储的状态数据,应该向 Actor 发送消息,Actor 接收到消息,并基于自己存储的数据,做出决策:决定修改状态数据,或者再向其他 Actor 发送消息。比如,刚才的计数器案例中,Actor 收到 `increment()` 的消息,并根据自己存储的状态,做自增操作。\n", "\n", - "为了保证 Actor 编程模型分布式环境下状态的一致性,对同一个 Actor 多次发送同样请求,多次请求是顺序执行的。就像计数器案例中,对同一个 Actor 进行5次 `increment()` 操作,这5次操作是顺序执行的。\n", + "为了保证 Actor 编程模型分布式环境下状态的一致性,对同一个 Actor 多次发送同样请求,多次请求是顺序执行的。就像计数器案例中,对同一个 Actor 进行 5 次 `increment()` 操作,这 5 次操作是顺序执行的。\n", "\n", "Actor 编程模型是消息驱动的,给某个 Actor 发送消息,它就会对该消息进行响应,修改自身的状态或者继续给其他 Actor 发送消息。Actor 编程模型不需要显式地在多个进程之间同步数据,因此也没有锁的问题以及同步等待的时间。Actor 编程模型可被用于大量异步操作的场景。\n", "\n", @@ -420,7 +420,7 @@ "source": [ "这里的 `ranking` 是一个 Actor 的引用(Actor Handle),有点像 `ObjectRef`,我们用 `ranking` 这个 Actor Handle 来管理这个 Actor。一旦 Actor Handle 被销毁,对应的 Actor 以及其状态也被销毁。\n", "\n", - "我们可以创建多个 Actor 实例,每个实例管理自己的状态。还可以用 [`ActorClass.options`](https://docs.ray.io/en/latest/ray-core/api/doc/ray.actor.ActorClass.options.html) 给这些 Actor 实例设置一些选项,起名字,设置 CPU、GPU 计算资源等。" + "我们可以创建多个 Actor 实例,每个实例管理自己的状态。还可以用 [`ActorClass.options()`](https://docs.ray.io/en/latest/ray-core/api/doc/ray.actor.ActorClass.options.html) 给这些 Actor 实例设置一些选项,起名字,设置 CPU、GPU 计算资源等。" ] }, { @@ -646,9 +646,7 @@ "origin_pos": 25 }, "source": [ - "如果我们想调用加入到 `ActorPool` 中的 Actor,可以使用 [`map(fn, values)`](https://docs.ray.io/en/latest/ray-core/api/doc/ray.util.ActorPool.map.html) 和 [`submit(fn, value)`](https://docs.ray.io/en/latest/ray-core/api/doc/ray.util.ActorPool.submit.html) 方法。这两个方法非常相似,所接收的参数是一个函数 `fn` 和参数 `value` 或者参数列表 `values`。`map()` 的 `values` 是一个列表,让函数并行地分发给多个 Actor 去处理;`submit()` 的 `value` 是单个值,每次从 `ActorPool` 中选择一个 Actor 去执行。`fn` 是一个 Lambda 表达式,或者说是一个匿名函数。这个 Lambda 表达式有两个参数:`actor` 和 `value`,`actor` 就是我们定义的单个 Actor 的函数调用,`value` 是这个函数的参数。\n", - "\n", - "函数的第一个参数是 `ActorPool` 中的 Actor,第二个参数是函数的参数。" + "如果我们想调用 `ActorPool` 中的 Actor,可以使用 [`map(fn, values)`](https://docs.ray.io/en/latest/ray-core/api/doc/ray.util.ActorPool.map.html) 和 [`submit(fn, value)`](https://docs.ray.io/en/latest/ray-core/api/doc/ray.util.ActorPool.submit.html) 方法。这两个方法非常相似,所接收的参数是一个函数 `fn` 和参数 `value` 或者参数列表 `values`。`map()` 的 `values` 是一个列表,让函数并行地分发给多个 Actor 去处理;`submit()` 的 `value` 是单个值,每次从 `ActorPool` 中选择一个 Actor 去执行。`fn` 是一个 Lambda 表达式,或者说是一个匿名函数。这个 Lambda 表达式有两个参数:`actor` 和 `value`,`actor` 就是我们定义的单个 Actor 的函数调用,`value` 是这个函数的参数。匿名函数 `fn` 的第一个参数是 `ActorPool` 中的 Actor,第二个参数是函数的参数。" ] }, { diff --git a/ch-ray-core/remote-function.ipynb b/ch-ray-core/remote-function.ipynb index 36704da..a996ffb 100644 --- a/ch-ray-core/remote-function.ipynb +++ b/ch-ray-core/remote-function.ipynb @@ -8,7 +8,7 @@ "(sec-remote-function)=\n", "# 分布式函数\n", "\n", - "被 Ray 加速的函数可以被运行在远程的 Ray 集群上,被称为远程函数(Remote Function)又被称为任务(Task)。Remote Function 是无状态的,无状态的意思是的执行只依赖函数的输入和输出,不依赖其他第三方的中间变量。\n", + "被 Ray 加速的函数可以被运行在远程的 Ray 集群上,被称为远程函数(Remote Function)又被称为任务(Task)。Remote Function 是无状态的,无状态的意思是的执行只依赖函数的输入和输出,不依赖函数作用域之外的中间变量。\n", "\n", "接下来将以三个案例来演示如何将 Python 函数横向扩展到 Ray 集群上:\n", "\n", @@ -72,7 +72,7 @@ "source": [ "## 启动 Ray 集群\n", "\n", - "在正式使用 Ray 的分布式功能之前,首先要启动一个 Ray 集群。启动 Ray 集群的方式有很多种,我们可以使用 [`ray.init()`](https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html) 函数,先启动一个单节点的 Ray 集群,以便后续的演示。这个单节点的 Ray 集群运行在执行这个 Python 任务的电脑上。" + "在正式使用 Ray 的分布式功能之前,首先要启动一个 Ray 集群。启动 Ray 集群的方式有很多种,我们可以使用 [`ray.init()`](https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html) 函数,先启动一个单节点的 Ray 集群,以便后续的演示。这个单节点的 Ray 集群运行在执行这个 Python 任务的计算机上。" ] }, { @@ -320,7 +320,7 @@ "id": "c4f72f6c", "metadata": {}, "source": [ - "这个例子中,计算复杂度不够大,你可以试着将 `SEQUENCE_SIZE` 改为更大的值,对比一下性能。\n", + "这个例子中,计算复杂度不够大,且计算本身不易被并行,你可以试着将 `SEQUENCE_SIZE` 改为更大的值,对比一下性能。\n", "\n", "## 原生 Python 函数与 Ray 的区别\n", "\n", @@ -336,11 +336,11 @@ "\n", "* 执行方式\n", "\n", - "原生 Python 函数 `func_name()` 的调用是同步执行的,或者说等待结果返回才进行后续计算,又或者说这个调用是阻塞的。一个 Ray 函数`func_name.remote()` 是异步执行的,或者说调用者不需要等待这个函数的计算真正执行完, Ray 就立即返回了一个 `ray.ObjectRef`,函数的计算是在后台某个计算节点上执行的。`ray.get(ObjectRef)` 会等待后台计算结果执行完,将结果返回给调用者。`ray.get(ObjectRef)` 是一个一个阻塞调用。\n", + "原生 Python 函数 `func_name()` 的调用是同步执行的,或者说等待结果返回才进行后续计算,又或者说这个调用是阻塞的。一个 Ray 函数`func_name.remote()` 是异步执行的,或者说调用者不需要等待这个函数的计算真正执行完, Ray 就立即返回了一个 `ray.ObjectRef`,函数的计算是在后台某个计算节点上执行的。`ray.get(ObjectRef)` 会等待后台计算结果执行完,将结果返回给调用者。`ray.get(ObjectRef)` 是一种阻塞调用。\n", "\n", "### 案例:蒙特卡洛估计 $\\pi$\n", "\n", - "接下来我们使用蒙特卡洛方法来估计 $\\pi$。如 {ref}`fig-square-circle`: 我们在一个 $2 \\times 2$ 的正方形中随机撒点,正方形内有一个半径为1的圆。所撒的点以一定概率落在圆内,假定我们已知落在圆内的概率是 $\\frac{\\pi}{4}$,我们可以根据随机撒点的概率情况推算出 $\\pi$ 的值。根据概率论相关知识,撒的点越多,概率越接近真实值。\n", + "接下来我们使用蒙特卡洛方法来估计 $\\pi$。如 {numref}`fig-square-circle`: 我们在一个 $2 \\times 2$ 的正方形中随机撒点,正方形内有一个半径为 1 的圆。所撒的点以一定概率落在圆内,假定我们已知落在圆内的概率是 $\\frac{\\pi}{4}$,我们可以根据随机撒点的概率情况推算出 $\\pi$ 的值。根据概率论相关知识,撒的点越多,概率越接近真实值。\n", "\n", "```{figure} ../img/ch-ray-core/square-circle.svg\n", "---\n", @@ -748,7 +748,7 @@ "id": "e61af064", "metadata": {}, "source": [ - "三个案例运行结束我们比较一下 Ray 的分布式执行效率。这三个任务都有各自特色,其中斐波那契数列的计算相对简单,主要使用 Python 提供的原生功能(列表、加法);模拟 $\\pi$ 任务调用了 `random` 和 `math` 两个 Python 标准库;图片处理任务调用了更加复杂的第三方库 `torch` ,`torchvision` 和 `pillow`,其中 `pillow` 和 `torch` 的底层实现都更加复杂。" + "三个案例运行结束我们比较一下 Ray 的分布式执行效率。这三个任务都有各自特色,其中斐波那契数列的计算相对简单,计算模式不易被并行,实现时主要使用 Python 提供的原生功能(列表、加法);模拟 $\\pi$ 任务调用了 `random` 和 `math` 两个 Python 标准库;图片处理任务调用了更加复杂的第三方库 `torch` ,`torchvision` 和 `pillow`,其中 `torch` 和 `pillow` 都是非常庞大的 Python 库。" ] }, { diff --git a/ch-ray-core/remote-object.ipynb b/ch-ray-core/remote-object.ipynb index 782201c..ca5cc67 100644 --- a/ch-ray-core/remote-object.ipynb +++ b/ch-ray-core/remote-object.ipynb @@ -108,11 +108,11 @@ "origin_pos": 2 }, "source": [ - "如 {numref}`fig-put-get-object-store` 所示,操作 Remote Object 主要有 `ray.put()` 和 `ray.get()` 两个 API:`ray.put()` 与 `ray.get()` 。\n", + "如 {numref}`fig-put-get-object-store` 所示,操作 Remote Object 主要有 `ray.put()` 和 `ray.get()` 两个 API。\n", "\n", "* `ray.put()` 把某个计算节点中的对象数据进行序列化,并将其写入到 Ray 集群的分布式对象存储中,返回一个 `RefObjectID`,`RefObjectID` 是指向这个 Remote Object 的指针。我们可以通过引用这个 `RefObjectID`,在 Remote Function 或 Remote Class 中分布式地使用这个数据对象。\n", "\n", - "* `ray.get()` 使用 `RefObjectID` 从把数据从分布式对象存储中拉取回来,并进行反序列化。\n", + "* `ray.get()` 使用 `RefObjectID` 把数据从分布式对象存储中拉取回来,并进行反序列化。\n", "\n", "```{figure} ../img/ch-ray-core/put-get-object-store.svg\n", "---\n", @@ -288,7 +288,7 @@ "source": [ "## 案例:对数据进行转换\n", "\n", - "Remote Ojbect 中的数据是不可修改的(Immutable),即无法对变量原地更改。下面的代码中,在单机上,我们可以对变量 `a` 进行赋值,但这些原地更改 Remote Object 的值。" + "Remote Ojbect 中的数据是不可修改的(Immutable),即无法对变量原地更改。下面的代码中,在单机上,我们可以对变量 `a` 进行赋值;但在 Ray 中,我们无法原地更改 Remote Object 的值。" ] }, { @@ -392,7 +392,7 @@ "\n", "### 直接传递\n", "\n", - "直接在 Task 或者 Actor 的函数调用时将 `RefObjectID` 作为参数传递进去。在下面这个例子中,`x_obj_ref` 是一个 `RefObjectID` ,`echo()` 这个 Remote Function 将自动从 `x_obj_ref` 获取 `x` 的值。这个自动获取值的过程被称为自动反引用(De-referenced)。" + "直接传递指在 Task 或者 Actor 的函数调用时将 `RefObjectID` 作为参数传递进去。在下面这个例子中,`x_obj_ref` 是一个 `RefObjectID` ,`echo()` 这个 Remote Function 将自动从 `x_obj_ref` 获取 `x` 的值。这个自动获取值的过程被称为自动反引用(De-referenced)。" ] }, { diff --git a/ch-ray-data/data-load-inspect-save.ipynb b/ch-ray-data/data-load-inspect-save.ipynb index 3e2018d..774e3b2 100644 --- a/ch-ray-data/data-load-inspect-save.ipynb +++ b/ch-ray-data/data-load-inspect-save.ipynb @@ -101,7 +101,7 @@ "source": [ "## 加载数据\n", "\n", - "Ray Data 提供了很多预置的数据加载方法,包括读取文件、读取 pandas DataFrame 这种内存数据、读取数据库中的数据。这里我们以一个纽约出租车司机的例子来演示读取 Parquet 文件。\n", + "Ray Data 提供了很多预置的数据加载方法,包括读取文件、读取 pandas DataFrame 这种内存数据、读取数据库中的数据。这里我们以纽约出租车司机的案例来演示读取 Parquet 文件。\n", "\n", "首先下载该数据,并使用 `ray.data` 提供的 [`read_parquet()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_parquet.html) 方法读取数据,得到一个 `Dataset`。。" ] @@ -330,9 +330,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "加了 `columns` 限制后,只有我们关心的列被读取,其他列不会被读取,又被称为列裁剪(Column Pruning)。Column Pruning 会减少数据读取开销,是数据工程领域经常使用的优化技巧之一。\n", - "\n", - "除了列裁剪,Ray Data 也支持行裁剪(Row Pruning),即满足特定条件的行被读取,比如 `tip_amount` 大于 6.0 的行被过滤出来:" + "加了 `columns` 限制后,只有我们关心的列被读取,其他列不会被读取,即列裁剪。除了列裁剪,Ray Data 也支持行裁剪,即满足特定条件的行被读取,比如 `tip_amount` 大于 6.0 的行被过滤出来:" ] }, { diff --git a/ch-ray-data/ray-data-intro.md b/ch-ray-data/ray-data-intro.md index cc15342..1018c72 100644 --- a/ch-ray-data/ray-data-intro.md +++ b/ch-ray-data/ray-data-intro.md @@ -1,7 +1,7 @@ (sec-ray-data-intro)= # Ray Data 简介 -Ray Data 是基于 Ray Core 的数据处理框架,主要解决机器学习模型训练或推理相关的数据准备与处理问题,即数据的最后一公里问题(Last-mile Preprocessing)。与 Dask DataFrame、Modin、Xorbits 相比,Ray Data 更通用,既可以处理二维表,也可以处理图片、视频;Ray Data 的通用也意味着它在很多方面还不够专业,比如 `groupby` 等操作相对比较粗糙。 +Ray Data 是基于 Ray Core 的数据处理框架,主要解决机器学习模型训练或推理相关的数据准备与处理问题,即数据的最后一公里问题(Last-mile Preprocessing)。与 Dask DataFrame、Modin、Xorbits 相比,Ray Data 更通用,既可以处理二维表,也可以处理图片、视频;Ray Data 的通用也意味着它在很多方面还不够专业,比如 `groupby` 等操作相对比较粗糙。除了 Ray Data 外,本章还会介绍 Modin。 Ray Data 对数据提供了一个抽象的类,[`ray.data.Dataset`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.html),在 `Dataset` 上提供了常见的大数据处理的原语,覆盖了数据处理的大部分阶段,例如: diff --git a/drawio/ch-dask-dataframe/dataframe-model.drawio b/drawio/ch-dask-dataframe/dataframe-model.drawio index 63fbeed..fb62a54 100644 --- a/drawio/ch-dask-dataframe/dataframe-model.drawio +++ b/drawio/ch-dask-dataframe/dataframe-model.drawio @@ -1,19 +1,19 @@ - + - + - + - + - + - + diff --git a/drawio/ch-dask-dataframe/shuffle-tasks-p2p.drawio b/drawio/ch-dask-dataframe/shuffle-tasks-p2p.drawio new file mode 100644 index 0000000..2ee4ec3 --- /dev/null +++ b/drawio/ch-dask-dataframe/shuffle-tasks-p2p.drawio @@ -0,0 +1,271 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/drawio/ch-ray-cluster/pg-pack.drawio b/drawio/ch-ray-cluster/pg-pack.drawio index 7512538..f017b8e 100644 --- a/drawio/ch-ray-cluster/pg-pack.drawio +++ b/drawio/ch-ray-cluster/pg-pack.drawio @@ -1,2287 +1,2287 @@ - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - - + + - - + + - + - + - + diff --git a/drawio/ch-ray-cluster/pg-spread.drawio b/drawio/ch-ray-cluster/pg-spread.drawio index fea520d..d61fd5c 100644 --- a/drawio/ch-ray-cluster/pg-spread.drawio +++ b/drawio/ch-ray-cluster/pg-spread.drawio @@ -1,2271 +1,2271 @@ - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - - + + - - + + - + - + - + - + diff --git a/img/ch-dask-dataframe/dataframe-model.svg b/img/ch-dask-dataframe/dataframe-model.svg index 6f737a9..6ccaec9 100644 --- a/img/ch-dask-dataframe/dataframe-model.svg +++ b/img/ch-dask-dataframe/dataframe-model.svg @@ -1,4 +1,4 @@ -
数据
数据
行标签
行标签
列标签
列标签
Text is not SVG - cannot display
\ No newline at end of file +
数据
数据
行标签
行标签
列标签
列标签
Text is not SVG - cannot display
\ No newline at end of file diff --git a/img/ch-dask-dataframe/divisions.png b/img/ch-dask-dataframe/divisions.png deleted file mode 100644 index ef58e59..0000000 Binary files a/img/ch-dask-dataframe/divisions.png and /dev/null differ diff --git a/img/ch-dask-dataframe/shuffle-tasks-p2p.png b/img/ch-dask-dataframe/shuffle-tasks-p2p.png deleted file mode 100644 index e7554f4..0000000 Binary files a/img/ch-dask-dataframe/shuffle-tasks-p2p.png and /dev/null differ diff --git a/img/ch-dask-dataframe/shuffle-tasks-p2p.svg b/img/ch-dask-dataframe/shuffle-tasks-p2p.svg new file mode 100644 index 0000000..07ad24a --- /dev/null +++ b/img/ch-dask-dataframe/shuffle-tasks-p2p.svg @@ -0,0 +1,4 @@ + + + +
输入
输入
中间层
中间层
输出
输出
tasks
tasks
p2p
p2p
输入
输入
障碍节点
障碍节点
输出
输出
Text is not SVG - cannot display
\ No newline at end of file diff --git a/img/ch-ray-cluster/pg-pack.svg b/img/ch-ray-cluster/pg-pack.svg index 2f8598c..7a7b266 100644 --- a/img/ch-ray-cluster/pg-pack.svg +++ b/img/ch-ray-cluster/pg-pack.svg @@ -1,4 +1,4 @@ -
CPU
CPU
CPU
CPU
CPU
CPU
Node{"CPU":6}
CPU
CPU
CPU
CPU
CPU
CPU
Node{"CPU":6}
CPU
CPU
CPU
CPU
CPU
CPU
Node{"CPU":6}
{"CPU":2} * 3, PACK
\ No newline at end of file +
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
{"CPU":6}
{"CPU":6}
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
{"CPU":6}
{"CPU":6}
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
{"CPU":6}
{"CPU":6}
{"CPU":2} * 3, PACK
{"CPU":2} * 3, PACK
Text is not SVG - cannot display
\ No newline at end of file diff --git a/img/ch-ray-cluster/pg-spread.svg b/img/ch-ray-cluster/pg-spread.svg index 1180345..a996b27 100644 --- a/img/ch-ray-cluster/pg-spread.svg +++ b/img/ch-ray-cluster/pg-spread.svg @@ -1,4 +1,4 @@ -
CPU
CPU
CPU
CPU
CPU
CPU
Node{"CPU":6}
CPU
CPU
CPU
CPU
CPU
CPU
Node{"CPU":6}
CPU
CPU
CPU
CPU
CPU
CPU
Node{"CPU":6}
{"CPU":2} * 3, SPREAD
\ No newline at end of file +
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
{"CPU":6}
{"CPU":6}
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
{"CPU":6}
{"CPU":6}
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
CPU
{"CPU":6}
{"CPU":6}
{"CPU":2} * 3, SPREAD
{"CPU":2} * 3, SPREAD
Text is not SVG - cannot display
\ No newline at end of file