Skip to content

Commit

Permalink
REFACTOR: Improve vulnerability handling in workflows (#5365)
Browse files Browse the repository at this point in the history
  • Loading branch information
SMoraisAnsys authored Nov 8, 2024
1 parent e580d7a commit 8e0bca8
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 30 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies = [
"pytomlpp; python_version < '3.12'",
"rpyc>=6.0.0,<6.1",
"pyyaml",
"defusedxml>=0.7,<8.0"
]

[project.optional-dependencies]
Expand Down
57 changes: 32 additions & 25 deletions src/ansys/aedt/core/workflows/customize_automation_tab.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import shutil
import subprocess # nosec
import sys
from typing import List
import xml.etree.ElementTree as ET # nosec

from defusedxml.ElementTree import parse as defused_parse
import defusedxml.minidom

defusedxml.defuse_stdlib()
Expand Down Expand Up @@ -89,7 +91,7 @@ def add_automation_tab(
root = ET.Element("TabConfig")
else:
try:
tree = ET.parse(tab_config_file_path) # nosec
tree = defused_parse(tab_config_file_path)
except ParseError as e: # pragma: no cover
warnings.warn("Unable to parse %s\nError received = %s" % (tab_config_file_path, str(e)))
return
Expand Down Expand Up @@ -168,7 +170,7 @@ def remove_xml_tab(toolkit_dir, name, panel="Panel_PyAEDT_Extensions"):
if not os.path.isfile(tab_config_file_path): # pragma: no cover
return True
try:
tree = ET.parse(tab_config_file_path) # nosec
tree = defused_parse(tab_config_file_path)
except ParseError as e: # pragma: no cover
warnings.warn("Unable to parse %s\nError received = %s" % (tab_config_file_path, str(e)))
return
Expand Down Expand Up @@ -278,6 +280,7 @@ def add_script_to_menu(
d = list(_desktop_sessions.values())[0]
personal_lib = d.personallib
aedt_version = d.aedt_version_id
d.logger.error("WE ARE HERE 2.")
if script_file and not os.path.exists(script_file): # pragma: no cover
logger.error("Script does not exists.")
return False
Expand Down Expand Up @@ -352,6 +355,17 @@ def __tab_map(product): # pragma: no cover
return product


def run_command(command: List[str], desktop_object): # pragma: no cover
"""Run a command through subprocess."""
try:
subprocess.run(command, check=True, capture_output=True, text=True) # nosec
except subprocess.CalledProcessError as e:
desktop_object.logger.error("Error occurred:", e.stderr)
return e.returncode

return 0


def add_custom_toolkit(desktop_object, toolkit_name, wheel_toolkit=None, install=True): # pragma: no cover
"""Add toolkit to AEDT Automation Tab.
Expand All @@ -370,6 +384,8 @@ def add_custom_toolkit(desktop_object, toolkit_name, wheel_toolkit=None, install
-------
bool
"""
print("WE ARE HERE.")
desktop_object.logger.error("WE ARE HERE.")
toolkits = available_toolkits()
toolkit_info = None
product_name = None
Expand Down Expand Up @@ -412,21 +428,6 @@ def add_custom_toolkit(desktop_object, toolkit_name, wheel_toolkit=None, install
)
)

def run_command(command):
try:
if is_linux: # pragma: no cover
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # nosec
else:
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # nosec
_, stderr = process.communicate()
ret_code = process.returncode
if ret_code != 0:
print("Error occurred:", stderr.decode("utf-8"))
return ret_code
except Exception as e:
print("Exception occurred:", str(e))
return 1 # Return non-zero exit code for indicating an error

version = desktop_object.odesktop.GetVersion()[2:6].replace(".", "")

if not is_linux:
Expand Down Expand Up @@ -454,7 +455,8 @@ def run_command(command):

if not os.path.exists(venv_dir):
desktop_object.logger.info("Creating virtual environment")
run_command(f'"{base_venv}" -m venv "{venv_dir}" --system-site-packages')
command = [base_venv, "-m", "venv", venv_dir, "--system-site-packages"]
run_command(command, desktop_object)
desktop_object.logger.info("Virtual environment created.")

is_installed = False
Expand All @@ -470,11 +472,13 @@ def run_command(command):
is_installed = True
if wheel_toolkit:
wheel_toolkit = os.path.normpath(wheel_toolkit)
desktop_object.logger.info("Installing dependencies")

desktop_object.logger.info(f"Installing dependencies")
if install and wheel_toolkit and os.path.exists(wheel_toolkit):
desktop_object.logger.info("Starting offline installation")
if is_installed:
run_command(f'"{pip_exe}" uninstall --yes {toolkit_info["pip"]}')
command = [pip_exe, "uninstall", "--yes", toolkit_info["pip"]]
run_command(command, desktop_object)
import zipfile

unzipped_path = os.path.join(
Expand All @@ -486,16 +490,20 @@ def run_command(command):
zip_ref.extractall(unzipped_path)

package_name = toolkit_info["package"]
run_command(f'"{pip_exe}" install --no-cache-dir --no-index --find-links={unzipped_path} {package_name}')
command = [pip_exe, "install", "--no-cache-dir", "--no-index", "--find-links={unzipped_path}", package_name]
run_command(command, desktop_object)
elif install and not is_installed:
# Install the specified package
run_command(f'"{pip_exe}" --default-timeout=1000 install {toolkit_info["pip"]}')
command = [pip_exe, "--default-timeout=1000", "install", toolkit_info["pip"]]
run_command(command, desktop_object)
elif not install and is_installed:
# Uninstall toolkit
run_command(f'"{pip_exe}" --default-timeout=1000 uninstall -y {toolkit_info["package"]}')
command = [pip_exe, "--default-timeout=1000", "uninstall", "-y", toolkit_info["package"]]
run_command(command, desktop_object)
elif install and is_installed:
# Update toolkit
run_command(f'"{pip_exe}" --default-timeout=1000 install {toolkit_info["pip"]} -U')
command = [pip_exe, "--default-timeout=1000", "install", toolkit_info["pip"], "-U"]
run_command(command, desktop_object)
else:
desktop_object.logger.info("Incorrect input")
return
Expand Down Expand Up @@ -531,7 +539,6 @@ def run_command(command):
name=toolkit_info["name"],
product=product_name,
)
desktop_object.logger.info("Installing dependencies")
desktop_object.logger.info(f'{toolkit_info["name"]} uninstalled')


Expand Down
11 changes: 6 additions & 5 deletions src/ansys/aedt/core/workflows/project/create_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ def main(extension_args):
report.add_image(os.path.join(aedtapp.working_directory, plot.plot_name + ".jpg"), plot.plot_name)
report.add_page_break()
report.add_toc()
out = report.save_pdf(aedtapp.working_directory, "AEDT_Results.pdf")
aedtapp.logger.info(f"Report Generated. {out}")
pdf_path = report.save_pdf(aedtapp.working_directory, "AEDT_Results.pdf")
aedtapp.logger.info(f"Report Generated. {pdf_path}")
if is_windows and not extension_args["is_test"]: # pragma: no cover
try:
os.startfile(out) # nosec
except Exception: # pragma: no cover
aedtapp.logger.warning(f"Failed to open {out}")
if os.path.isfile(pdf_path) and pdf_path.endswith(".pdf"):
os.startfile(pdf_path) # nosec
except Exception:
aedtapp.logger.warning(f"Failed to open {pdf_path}")

if not extension_args["is_test"]: # pragma: no cover
app.release_desktop(False, False)
Expand Down

0 comments on commit 8e0bca8

Please sign in to comment.