Skip to content

Commit

Permalink
Merge dev into main
Browse files Browse the repository at this point in the history
Signed-off-by: spark-rapids automation <[email protected]>
  • Loading branch information
nvauto committed Apr 30, 2024
2 parents 53c3505 + 0870722 commit e093c7c
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ abstract class Platform(var gpuDevice: Option[GpuDevice]) {
/**
* Recommendations to be included in the final list of recommendations.
* These properties should be specific to the platform and not general Spark properties.
* For example: "spark.databricks.optimizer.dynamicFilePruning" for the Databricks platform.
* For example: we used to set "spark.databricks.optimizer.dynamicFilePruning" to false for the
* Databricks platform.
*
* Represented as a tuple of (propertyKey, propertyValue).
*/
Expand Down Expand Up @@ -135,9 +136,6 @@ abstract class DatabricksPlatform(gpuDevice: Option[GpuDevice]) extends Platform
"spark.executor.memory",
"spark.executor.memoryOverhead"
)
override val recommendationsToInclude: Seq[(String, String)] = Seq(
("spark.databricks.optimizer.dynamicFilePruning", "false")
)

override def createClusterInfo(coresPerExecutor: Int, numExecutorNodes: Int,
sparkProperties: Map[String, String], systemProperties: Map[String, String]): ClusterInfo = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,7 @@ object AutoTuner extends Logging {
"spark.executor.instances" ->
"'spark.executor.instances' should be set to (gpuCount * numWorkers).",
"spark.task.resource.gpu.amount" ->
"'spark.task.resource.gpu.amount' should be set to Max(1, (numCores / gpuCount)).",
"'spark.task.resource.gpu.amount' should be set to Min(1, (gpuCount / numCores)).",
"spark.rapids.sql.concurrentGpuTasks" ->
s"'spark.rapids.sql.concurrentGpuTasks' should be set to Min(4, (gpuMemory / 7.5G)).",
"spark.rapids.memory.pinnedPool.size" ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
|- 'spark.sql.adaptive.enabled' should be enabled for better performance.
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.sql.shuffle.partitions' was not set.
|- 'spark.task.resource.gpu.amount' should be set to Max(1, (numCores / gpuCount)).
|- 'spark.task.resource.gpu.amount' should be set to Min(1, (gpuCount / numCores)).
|- RAPIDS Accelerator for Apache Spark plugin jar is missing
| from the classpath entries.
| If the Spark RAPIDS jar is being bundled with your
Expand Down Expand Up @@ -273,7 +273,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
|- 'spark.sql.adaptive.enabled' should be enabled for better performance.
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.sql.shuffle.partitions' was not set.
|- 'spark.task.resource.gpu.amount' should be set to Max(1, (numCores / gpuCount)).
|- 'spark.task.resource.gpu.amount' should be set to Min(1, (gpuCount / numCores)).
|- Incorrect values in worker system information: {numCores: 0, memory: 122880MiB, numWorkers: 4}.
|- RAPIDS Accelerator for Apache Spark plugin jar is missing
| from the classpath entries.
Expand Down Expand Up @@ -312,7 +312,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
|- 'spark.sql.adaptive.enabled' should be enabled for better performance.
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.sql.shuffle.partitions' was not set.
|- 'spark.task.resource.gpu.amount' should be set to Max(1, (numCores / gpuCount)).
|- 'spark.task.resource.gpu.amount' should be set to Min(1, (gpuCount / numCores)).
|- Incorrect values in worker system information: {numCores: 32, memory: , numWorkers: 4}.
|- RAPIDS Accelerator for Apache Spark plugin jar is missing
| from the classpath entries.
Expand Down Expand Up @@ -351,7 +351,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
|- 'spark.sql.adaptive.enabled' should be enabled for better performance.
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.sql.shuffle.partitions' was not set.
|- 'spark.task.resource.gpu.amount' should be set to Max(1, (numCores / gpuCount)).
|- 'spark.task.resource.gpu.amount' should be set to Min(1, (gpuCount / numCores)).
|- Incorrect values in worker system information: {numCores: 32, memory: 0m, numWorkers: 4}.
|- RAPIDS Accelerator for Apache Spark plugin jar is missing
| from the classpath entries.
Expand Down
23 changes: 12 additions & 11 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

"""Implementation class representing wrapper around the RAPIDS acceleration Qualification tool."""

import textwrap
from dataclasses import dataclass, field
from math import ceil
from typing import Any, List, Callable
Expand All @@ -32,6 +31,7 @@
from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool
from spark_rapids_tools.enums import QualFilterApp, QualGpuClusterReshapeType, QualEstimationModel
from spark_rapids_tools.tools.model_xgboost import predict
from spark_rapids_tools.tools.speedup_category import SpeedupCategory
from spark_rapids_tools.tools.top_candidates import TopCandidates
from spark_rapids_tools.tools.unsupported_ops_stage_duration import UnsupportedOpsStageDuration
from spark_rapids_tools.utils.util import Utilities
Expand Down Expand Up @@ -694,8 +694,13 @@ def __build_global_report_summary(self,
# Calculate unsupported operators stage duration before grouping
all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df)
apps_pruned_df = self.__remap_columns_and_prune(all_apps)
speedup_category_ob = SpeedupCategory(self.ctxt.get_value('local', 'output', 'speedupCategories'))
# Calculate the speedup category column
apps_pruned_df = speedup_category_ob.build_category_column(apps_pruned_df)
apps_pruned_df.to_csv(output_files_info['full']['path'], float_format='%.2f')
apps_grouped_df, group_notes = self.__group_apps_by_name(apps_pruned_df)
# Recalculate the speedup category column after grouping
apps_grouped_df = speedup_category_ob.build_category_column(apps_grouped_df)
recommended_apps = self.__get_recommended_apps(apps_grouped_df)
# if the gpu_reshape_type is set to JOB then, then we should ignore recommended apps
speedups_irrelevant_flag = self.__recommendation_is_non_standard()
Expand Down Expand Up @@ -763,13 +768,17 @@ def process_df_for_stdout(raw_df):
filter_recommendation_enabled = self.ctxt.get_ctxt('filterApps') == QualFilterApp.SPEEDUPS
filter_pos_enabled = self.ctxt.get_ctxt('filterApps') == QualFilterApp.SAVINGS
filter_top_candidate_enabled = self.ctxt.get_ctxt('filterApps') == QualFilterApp.TOP_CANDIDATES
squeeze_header_enabled = self.ctxt.get_value('toolOutput', 'stdout', 'summaryReport', 'compactWidth')
header_width = self.ctxt.get_value('toolOutput', 'stdout', 'summaryReport', 'columnWidth')

if filter_top_candidate_enabled:
# TODO: Ideally we should create instance of TopCandidates as class variable using the filter apps flag.
# This should be refactored along with entire filter apps logic to use more object-oriented design.
top_candidates_obj = TopCandidates(self.ctxt.get_value('local', 'output', 'topCandidates'))
filtered_apps = top_candidates_obj.filter_apps(raw_df)
return top_candidates_obj.prepare_output(filtered_apps)
result_df = top_candidates_obj.prepare_output(filtered_apps)
# squeeze the header titles if enabled
return Utilities.squeeze_df_header(result_df, header_width) if squeeze_header_enabled else result_df

if self.__recommendation_is_non_standard():
# During processing of arguments phase, we verified that the filter does not conflict
Expand Down Expand Up @@ -809,15 +818,7 @@ def process_df_for_stdout(raw_df):
df_row.columns = df_row.columns.str.replace('Duration',
f'Duration{time_unit}', regex=False)
# squeeze the header titles if enabled
if self.ctxt.get_value('toolOutput', 'stdout', 'summaryReport', 'compactWidth'):
col_w_conf = self.ctxt.get_value('toolOutput', 'stdout', 'summaryReport', 'columnWidth')
for column in df_row.columns:
if len(column) > col_w_conf:
new_column_name = textwrap.fill(column, col_w_conf, break_long_words=False)
if new_column_name != column:
df_row.columns = df_row.columns.str.replace(column,
new_column_name, regex=False)
return df_row
return Utilities.squeeze_df_header(df_row, header_width) if squeeze_header_enabled else df_row

if not self._evaluate_rapids_jar_tool_output_exist():
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
},
{
"name": "Hadoop Azure",
"uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/3.3.6/hadoop-azure-3.3.6.jar",
"uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/3.3.4/hadoop-azure-3.3.4.jar",
"type": "jar",
"md5": "0fb8b8e565fd920fb809220cb2cc5ee7",
"sha1": "24425e7fad3a302715cefd570a0f4bdf3f50bc8e",
"size": 609646
"md5": "1ec4cbd59548412010fe1515070eef73",
"sha1": "a23f621bca9b2100554150f6b0b521f94b8b419e",
"size": 574116
}
],
"SPARK333-LOCAL": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,43 +213,46 @@ local:
resultColumnName: 'Unsupported Operators Stage Duration'
percentResultColumnName: 'Unsupported Operators Stage Duration Percent'
topCandidates:
categoryColumnName: 'Estimated GPU Speedup Category'
outputColumns:
- 'App ID'
- 'App Name'
- 'App Duration'
- 'Estimated GPU Speedup'
- 'Unsupported Operators Stage Duration Percent'
- 'Estimated GPU Speedup Category'
eligibleCategories:
- 'Small'
- 'Medium'
- 'Large'
sortingColumns:
- 'Estimated GPU Speedup'
- 'Unsupported Operators Stage Duration Percent'
joinColumns:
- 'App ID'
- 'App Name'
- 'App Duration'
ranges:
- columnName: 'Estimated GPU Speedup'
lowerBound: 1.3
upperBound: 1000000.0
- columnName: 'Unsupported Operators Stage Duration Percent'
lowerBound: 0.0
upperBound: 25.0
output: # Configs related to output
columns:
- 'App ID'
- 'App Name'
- 'Estimated GPU Speedup'
remap:
- columnName: 'Estimated GPU Speedup'
recommendationRanges:
- title: 'Small'
lowerBound: 1.3
upperBound: 2.0
- title: 'Medium'
lowerBound: 2.0
upperBound: 3.0
- title: 'Large'
lowerBound: 3.0
upperBound: 1000000.0
speedupCategories:
speedupColumnName: 'Estimated GPU Speedup'
categoryColumnName: 'Estimated GPU Speedup Category'
categories:
- title: 'Not Applicable'
lowerBound: -1000000.0
upperBound: 1.3
- title: 'Small'
lowerBound: 1.3
upperBound: 2.0
- title: 'Medium'
lowerBound: 2.0
upperBound: 3.0
- title: 'Large'
lowerBound: 3.0
upperBound: 1000000.0
eligibilityConditions:
- columnName: 'Estimated GPU Speedup'
lowerBound: 1.3
upperBound: 1000000.0
- columnName: 'Unsupported Operators Stage Duration Percent'
lowerBound: 0.0
upperBound: 25.0
defaultCategory: 'Not Recommended'
predictionModel:
outputDirectory: 'xgboost_predictions'
files:
Expand Down
93 changes: 93 additions & 0 deletions user_tools/src/spark_rapids_tools/tools/speedup_category.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Implementation class for Speedup Category logic."""

from dataclasses import dataclass, field
from typing import Optional

import pandas as pd


@dataclass
class SpeedupCategory:
"""
Encapsulates the logic to categorize the speedup values based on the range values.
"""
props: dict = field(default=None, init=True)

def __build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame:
"""
Build the category column based on the range values of the speedup column.
Example:
props['categories'] = [
{'title': 'Not Recommended', 'lowerBound': -100000, 'upperBound': 1.3},
{'title': 'Small', 'lowerBound': 1.3, 'upperBound': 2},
{'title': 'Medium', 'lowerBound': 2, 'upperBound': 3},
{'title': 'Large', 'lowerBound': 3, 'upperBound': 100000}
]
1. input: row_1 = pd.Series({'speedup': 1.8})
output: row_1 = pd.Series({'speedup': 1.8, 'speedup category': 'Small'})
reason: Speedup Category will be 'Small' because the speedup is within the range (1.3-2).
2. input: row_2 = pd.Series({'speedup': 3.5})
output: row_2 = pd.Series({'speedup': 3.5, 'speedup category': 'Large'})
reason: Speedup Category will be 'Large' because the speedup is within the range (3-100000).
"""
categories = self.props.get('categories')
category_col_name = self.props.get('categoryColumnName')
speedup_col_name = self.props.get('speedupColumnName')

# Calculate the category based on the speedup value
def calculate_category(col_value) -> Optional[str]:
for category in categories:
if category.get('lowerBound') <= col_value < category.get('upperBound'):
return category.get('title')
return None
all_apps[category_col_name] = all_apps[speedup_col_name].apply(calculate_category)
return all_apps

def __process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame:
"""
Process the speedup category column based on the eligibility criteria. If the row does not match
the criteria, the category column will be set to the `Not Recommended` category.
Example:
self.props['eligibilityConditions'] = [
{'columnName': 'criteriaCol1', 'lowerBound': 18, 'upperBound': 30},
{'columnName': 'criteriaCol2', 'lowerBound': 70, 'upperBound': 100}
]
1. input: row_1 = pd.Series({'criteriaCol1': 25, 'criteriaCol2': 85, 'speedup category': 'Large'})
output: row_1 = pd.Series({'criteriaCol1': 25, 'criteriaCol2': 85, 'speedup category': 'Large'})
reason: Category will remain 'Large' because the criteriaCol1 is within the range (18-30) and
the criteriaCol2 (85) is within the range (70-100).
2. input: row_2 = pd.Series({'criteriaCol1': 15, 'criteriaCol2': 85, 'speedup category': 'Medium'})
output: row_2 = pd.Series({'criteriaCol1': 15, 'criteriaCol2': 85, 'speedup category': 'Not Recommended'})
reason: Category will be set to 'Not Recommended' because the criteriaCol1 is not within the range (18-30)
"""
category_col_name = self.props.get('categoryColumnName')

def process_row(single_row: pd.Series) -> str:
for entry in self.props.get('eligibilityConditions'):
col_value = single_row[entry.get('columnName')]
# If the value is not within the range, set the category to default category (Not Recommended)
if not entry.get('lowerBound') <= col_value <= entry.get('upperBound'):
return self.props.get('defaultCategory')
return single_row.get(category_col_name)

all_apps[category_col_name] = all_apps.apply(process_row, axis=1)
return all_apps

def build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame:
apps_with_category = self.__build_category_column(all_apps)
processed_apps = self.__process_category(apps_with_category)
return processed_apps
49 changes: 8 additions & 41 deletions user_tools/src/spark_rapids_tools/tools/top_candidates.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
"""Implementation class for Top Candidates logic."""

from dataclasses import dataclass, field
from typing import Optional
from functools import partial

import pandas as pd

Expand All @@ -32,47 +30,16 @@ def filter_apps(self, all_apps: pd.DataFrame) -> pd.DataFrame:
"""
Generic method to filter applications based on criteria
"""
filtered_apps = all_apps[all_apps.apply(self.__filter_single_row, axis=1)]
# Select output columns and sort
output_columns = self.props.get('outputColumns')
sorting_columns = self.props.get('sortingColumns')
return filtered_apps[output_columns].sort_values(by=sorting_columns, ascending=False)

def __filter_single_row(self, single_row: pd.Series) -> bool:
"""
Used to create a filter for based on specified ranges.
Example:
self.props['ranges'] = [
{'columnName': 'colA', 'lowerBound': 18, 'upperBound': 30},
{'columnName': 'colB', 'lowerBound': 70, 'upperBound': 100}
]
single_row = pd.Series({'colA': 25, 'colB': 85})
The function will return True because the colA (25) is within the range (18-30)
and the colB (85) is within the range (70-100).
"""
for criteria in self.props.get('ranges'):
col_value = single_row[criteria.get('columnName')]
if not criteria.get('lowerBound') <= col_value <= criteria.get('upperBound'):
return False
return True
category_col_name = self.props.get('categoryColumnName')
eligible_categories = self.props.get('eligibleCategories')
# Filter applications based on categories
return all_apps[all_apps[category_col_name].isin(eligible_categories)]

def prepare_output(self, all_apps: pd.DataFrame) -> pd.DataFrame:
"""
Generic method to transform applications for the output
"""
output_props = self.props.get('output')

# Function to remap column values based on recommended ranges
def remap_column(col_value, recommended_ranges: dict) -> Optional[str]:
for s_range in recommended_ranges:
if s_range['lowerBound'] <= col_value < s_range['upperBound']:
return s_range['title']
return None

# Iterate over each entry and apply remapping to respective columns
for remap_entry in output_props.get('remap', []):
column_name = remap_entry.get('columnName')
recommendation_ranges = remap_entry.get('recommendationRanges')
remap_func = partial(remap_column, recommended_ranges=recommendation_ranges)
all_apps[column_name] = all_apps[column_name].apply(remap_func)
return all_apps[output_props.get('columns', [])]
output_columns = self.props.get('outputColumns')
sorting_columns = self.props.get('sortingColumns')
# Sort columns and select output columns
return all_apps.sort_values(by=sorting_columns, ascending=False)[output_columns]
Loading

0 comments on commit e093c7c

Please sign in to comment.