Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add qualification support for Photon jobs in the Python Tool #1409

Merged
73 changes: 68 additions & 5 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,12 @@ def __build_global_report_summary(self,
total_apps: pd.DataFrame,
unsupported_ops_df: pd.DataFrame,
output_files_info: JSONPropertiesContainer) -> QualificationSummary:
# TODO: This method does a lot of critical but unrelated work. Refactor this into smaller steps/methods
# to improve readability and maintainability.
if all_apps.empty:
# No need to run saving estimator or process the data frames.
return QualificationSummary(total_apps=total_apps, tools_processed_apps=all_apps)

unsupported_ops_obj = UnsupportedOpsStageDuration(self.ctxt.get_value('local', 'output',
'unsupportedOperators'))
# Generate the statistics report
try:
stats_report = SparkQualificationStats(ctxt=self.ctxt)
Expand All @@ -303,37 +303,52 @@ def __build_global_report_summary(self,
self.logger.error('Failed to generate the statistics report: %s', e)

# Calculate unsupported operators stage duration before grouping
unsupported_ops_obj = UnsupportedOpsStageDuration(
self.ctxt.get_value('local', 'output', 'unsupportedOperators'))
all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df)
apps_pruned_df = self.__remap_columns_and_prune(all_apps)

# Apply additional heuristics to skip apps not suitable for GPU acceleration
heuristics_ob = AdditionalHeuristics(
props=self.ctxt.get_value('local', 'output', 'additionalHeuristics'),
tools_output_dir=self.ctxt.get_rapids_output_folder(),
output_file=output_files_info.get_value('intermediateOutput', 'files', 'heuristics', 'path'))
apps_pruned_df = heuristics_ob.apply_heuristics(apps_pruned_df)
speedup_category_ob = SpeedupCategory(self.ctxt.get_value('local', 'output', 'speedupCategories'))

# Group the applications and recalculate metrics
apps_grouped_df, group_notes = self.__group_apps_by_name(apps_pruned_df)
df_final_result = speedup_category_ob.build_category_column(apps_grouped_df)

# Assign the runtime type (Spark/Photon etc.) and speedup categories (Small/Medium/Large) to each application.
# Note: Strategy for speedup categorization will be based on the execution engine of the application.
apps_with_runtime_df = self._assign_spark_runtime_to_apps(apps_grouped_df)
speedup_category_confs = self.ctxt.get_value('local', 'output', 'speedupCategories')
speedup_category_ob = SpeedupCategory(speedup_category_confs)
df_final_result = speedup_category_ob.build_category_column(apps_with_runtime_df)

# Generate the cluster shape report
reshaped_notes = self.__generate_cluster_shape_report()
report_comments = [group_notes] if group_notes else []
if reshaped_notes:
report_comments.append(reshaped_notes)

# Write the final result to the output file
csv_out = output_files_info.get_value('summary', 'path')
if not df_final_result.empty:
self.logger.info('Generating GPU Estimated Speedup: as %s', csv_out)
df_final_result.to_csv(csv_out, float_format='%.2f')

# Add columns for cluster configuration recommendations and tuning configurations to the processed_apps.
recommender = ClusterConfigRecommender(self.ctxt)
df_final_result = recommender.add_cluster_and_tuning_recommendations(df_final_result)
# Merge the total_apps with the processed_apps to get the Event Log
df_final_result = pd.merge(df_final_result, total_apps[['Event Log', 'AppID']],
left_on='App ID', right_on='AppID')

# Write the app metadata
app_metadata_info = output_files_info.get_value('appMetadata')
config_recommendations_info = output_files_info.get_value('configRecommendations')
self._write_app_metadata(df_final_result, app_metadata_info, config_recommendations_info)

# Return the summary
return QualificationSummary(total_apps=total_apps,
tools_processed_apps=df_final_result,
comments=report_comments)
Expand Down Expand Up @@ -595,6 +610,54 @@ def _read_qualification_output_file(self, report_name_key: str, file_format_key:
report_file_path = FSUtil.build_path(self.ctxt.get_rapids_output_folder(), report_file_name)
return pd.read_csv(report_file_path)

def _read_qualification_metric_file(self, file_name: str) -> Dict[str, pd.DataFrame]:
"""
Helper method to read metric files from the qualification tool's output metric folder.
Returns a dictionary of DataFrames, where each key is an application ID, and each
DataFrame contains the corresponding application's metrics data.
Example:
{
'appId1': pd.DataFrame(...),
'appId2': pd.DataFrame(...),
}
:param file_name: Name of the metric file to read from each application's folder
"""
metrics = {}
root_metric_dir = self.ctxt.get_metrics_output_folder()
amahussein marked this conversation as resolved.
Show resolved Hide resolved
apps_with_missing_metrics = []
for metric_dir in root_metric_dir.get_subdirectories():
app_id_str = metric_dir.base_name()
report_file_path = metric_dir.create_sub_path(file_name)
try:
metrics[app_id_str] = pd.read_csv(str(report_file_path))
except Exception: # pylint: disable=broad-except
# Some apps may not have the given metrics file, we should ensure
# that the dictionary contains entries for all apps to avoid KeyErrors
# and maintain consistency in processing.
metrics[app_id_str] = pd.DataFrame()
apps_with_missing_metrics.append(app_id_str)

# Log apps with missing metrics files
if apps_with_missing_metrics:
self.logger.warning('Unable to read metrics file \'%s\' for apps: %s', file_name,
', '.join(apps_with_missing_metrics))
return metrics

def _assign_spark_runtime_to_apps(self, tools_processed_apps: pd.DataFrame) -> pd.DataFrame:
"""
Assigns the Spark Runtime (Spark/Photon) to each application. This will be used to categorize
applications into speedup categories (Small/Medium/Large).
"""
app_info_dict = self._read_qualification_metric_file('application_information.csv')
# Rename columns from each DataFrame in the app_info_dict and merge them with the tools_processed_apps
merged_dfs = []
for df in app_info_dict.values():
merged_dfs.append(
df[['appId', 'sparkRuntime']].rename(columns={'appId': 'App ID', 'sparkRuntime': 'Spark Runtime'})
)
spark_runtime_df = pd.concat(merged_dfs, ignore_index=True)
return tools_processed_apps.merge(spark_runtime_df, on='App ID', how='left')


@dataclass
class QualificationAsLocal(Qualification):
Expand Down
7 changes: 6 additions & 1 deletion user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from spark_rapids_pytools.common.prop_manager import YAMLPropertiesContainer
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import ToolLogging, Utils
from spark_rapids_tools import CspEnv
from spark_rapids_tools import CspEnv, CspPath
from spark_rapids_tools.utils import Utilities


Expand Down Expand Up @@ -215,6 +215,11 @@ def get_rapids_output_folder(self) -> str:
return root_dir
return FSUtil.build_path(root_dir, rapids_subfolder)

def get_metrics_output_folder(self) -> CspPath:
root_dir = CspPath(self.get_rapids_output_folder())
metrics_subfolder = self.get_value('toolOutput', 'metricsSubFolder')
return root_dir.create_sub_path(metrics_subfolder)

def get_log4j_properties_file(self) -> str:
return self.get_value_silent('toolOutput', 'textFormat', 'log4jFileName')

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
toolOutput:
completeOutput: true
subFolder: rapids_4_spark_qualification_output
metricsSubFolder: raw_metrics
textFormat:
summaryLog:
fileName: rapids_4_spark_qualification_output.log
Expand Down Expand Up @@ -148,6 +149,7 @@ local:
- 'App Name'
- 'Event Log'
- 'Cluster Info'
- 'Spark Runtime'
- 'Estimated GPU Speedup Category'
- 'Full Cluster Config Recommendations*'
- 'GPU Config Recommendation Breakdown*'
Expand Down Expand Up @@ -254,27 +256,51 @@ local:
speedupColumnName: 'Estimated GPU Speedup'
categoryColumnName: 'Estimated GPU Speedup Category'
heuristicsColumnName: 'Skip by Heuristics'
categories:
- title: 'Not Applicable'
lowerBound: -1000000.0
upperBound: 1.3
- title: 'Small'
lowerBound: 1.3
upperBound: 2.0
- title: 'Medium'
lowerBound: 2.0
upperBound: 3.0
- title: 'Large'
lowerBound: 3.0
upperBound: 1000000.0
eligibilityConditions:
- columnName: 'Estimated GPU Speedup'
lowerBound: 1.3
upperBound: 1000000.0
- columnName: 'Unsupported Operators Stage Duration Percent'
lowerBound: 0.0
upperBound: 25.0
sparkRuntimeColumnName: 'Spark Runtime'
defaultCategory: 'Not Recommended'
strategies:
spark: # Spark specific speedup categories
categories:
- title: 'Not Applicable'
lowerBound: -1000000.0
upperBound: 1.3
- title: 'Small'
lowerBound: 1.3
upperBound: 2.0
- title: 'Medium'
lowerBound: 2.0
upperBound: 3.0
- title: 'Large'
lowerBound: 3.0
upperBound: 1000000.0
eligibilityConditions:
- columnName: 'Estimated GPU Speedup'
lowerBound: 1.3
upperBound: 1000000.0
- columnName: 'Unsupported Operators Stage Duration Percent'
lowerBound: 0.0
upperBound: 25.0
photon: # Photon specific speedup categories
parthosa marked this conversation as resolved.
Show resolved Hide resolved
categories:
- title: 'Not Applicable'
lowerBound: -1000000.0
upperBound: 1.0
- title: 'Small'
lowerBound: 1.0
upperBound: 2.0
- title: 'Medium'
lowerBound: 2.0
upperBound: 3.0
- title: 'Large'
lowerBound: 3.0
upperBound: 1000000.0
eligibilityConditions:
- columnName: 'Estimated GPU Speedup'
lowerBound: 1.0
upperBound: 1000000.0
- columnName: 'Unsupported Operators Stage Duration Percent'
lowerBound: 0.0
upperBound: 25.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs some thinking on the impact of design.
This introduces a platform configuration inside the tool's conf. On the other hand, we do have a configuration file per platform.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking since all platforms would have the same value for spark case, we would be duplicating the configuration in each platform. In future, if we have different values for different platform, we could put these in separate platform config files.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a valid point that there are some common settings between platforms.
In future, we can improve our config structure to have common parent or something shared between all the platforms.
The other way around of specifying the platfrom behavioor inside the tools config will trigger a design inconsistency moving fwd; especially with every contributor's preference on where a newly added config should go.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is okay for now to keep that in order to unblock the photon feature.
Later, we can revisit this.

additionalHeuristics:
appInfo:
fileName: 'application_information.csv'
Expand Down
7 changes: 7 additions & 0 deletions user_tools/src/spark_rapids_tools/storagelib/csppath.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,10 @@ def download_files(cls, src_url: str, dest_url: str):
@classmethod
def get_storage_name(cls) -> str:
return cls._path_meta.name

def get_subdirectories(self) -> List['CspPath']:
"""
Retrieve the subdirectories in the current path.
"""
dir_info_list = self.fs_obj.get_file_info(fs.FileSelector(self.no_scheme, recursive=False))
return [CspPath(f.path) for f in dir_info_list if f.type == FileType.Directory]
parthosa marked this conversation as resolved.
Show resolved Hide resolved
51 changes: 42 additions & 9 deletions user_tools/src/spark_rapids_tools/tools/speedup_category.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,44 @@
"""Implementation class for Speedup Category logic."""

from dataclasses import dataclass, field
from typing import Optional
from typing import Optional, Dict

import pandas as pd


class SpeedupStrategy:
"""
Wrapper class for speedup strategy properties.
"""
_categories: list
_eligibility_conditions: list

def __init__(self, props: dict):
self._categories = props.get('categories', [])
self._eligibility_conditions = props.get('eligibilityConditions', [])

def get_categories(self) -> list:
return self._categories

def get_eligibility_conditions(self) -> list:
return self._eligibility_conditions


@dataclass
class SpeedupCategory:
"""
Encapsulates the logic to categorize the speedup values based on the range values.
"""
props: dict = field(default=None, init=True)
speedup_strategies: Dict[str, SpeedupStrategy] = field(default_factory=dict, init=False)

def __post_init__(self):
strategy_properties = self.props.get('strategies', {})
# Create a SpeedupStrategy for each runtime type.
for spark_runtime, properties in strategy_properties.items(): # type: str, dict
self.speedup_strategies[spark_runtime] = SpeedupStrategy(properties)

def __build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame:
def _build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame:
"""
Build the category column based on the range values of the speedup column.
Example:
Expand All @@ -44,20 +69,24 @@ def __build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame:
output: row_2 = pd.Series({'speedup': 3.5, 'speedup category': 'Large'})
reason: Speedup Category will be 'Large' because the speedup is within the range (3-100000).
"""
categories = self.props.get('categories')
category_col_name = self.props.get('categoryColumnName')
speedup_col_name = self.props.get('speedupColumnName')
spark_runtime_col_name = self.props.get('sparkRuntimeColumnName')

# Calculate the category based on the speedup value
def calculate_category(col_value) -> Optional[str]:
def calculate_category(single_row: pd.Series) -> Optional[str]:
spark_runtime = single_row.get(spark_runtime_col_name).lower()
# Get the speedup strategy and its categories for the given runtime type.
categories = self.speedup_strategies.get(spark_runtime).get_categories()
col_value = single_row.get(speedup_col_name)
for category in categories:
if category.get('lowerBound') <= col_value < category.get('upperBound'):
return category.get('title')
return None
all_apps[category_col_name] = all_apps[speedup_col_name].apply(calculate_category)
all_apps[category_col_name] = all_apps.apply(calculate_category, axis=1)
return all_apps

def __process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame:
def _process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame:
"""
Process the speedup category column based on the eligibility criteria. If the row does not match
the criteria, the category column will be set to the `Not Recommended` category.
Expand All @@ -76,9 +105,13 @@ def __process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame:
"""
category_col_name = self.props.get('categoryColumnName')
heuristics_col_name = self.props.get('heuristicsColumnName')
spark_runtime_col_name = self.props.get('sparkRuntimeColumnName')

def process_row(single_row: pd.Series) -> str:
for entry in self.props.get('eligibilityConditions'):
spark_runtime = single_row.get(spark_runtime_col_name).lower()
# Get the speedup strategy and its eligibility conditions for the given runtime type.
eligibility_conditions = self.speedup_strategies.get(spark_runtime).get_eligibility_conditions()
for entry in eligibility_conditions:
col_value = single_row[entry.get('columnName')]
# If the row is marked to be skipped by heuristics or the value is not within the range,
# set the category to default category (Not Recommended)
Expand All @@ -91,6 +124,6 @@ def process_row(single_row: pd.Series) -> str:
return all_apps

def build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame:
apps_with_category = self.__build_category_column(all_apps)
processed_apps = self.__process_category(apps_with_category)
apps_with_category = self._build_category_column(all_apps)
processed_apps = self._process_category(apps_with_category)
return processed_apps