diff --git a/pyproject.toml b/pyproject.toml
index dfde42547..353d19dff 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -48,7 +48,6 @@ dev = [
"flask",
"fastapi~=0.103.2",
"pydantic",
- "pycryptodome==3.*",
"flake8==5.*; python_version == '3.7'",
"flake8==6.*; python_version >= '3.8'",
"mypy",
@@ -69,7 +68,8 @@ dev = [
"pandas",
"numpy",
"pre-commit",
- "invoke"
+ "invoke",
+ "cryptography"
]
test-http-v2 = [
"azurefunctions-extensions-http-fastapi",
diff --git a/tests/unittests/test_third_party_http_functions.py b/tests/unittests/test_third_party_http_functions.py
index 8d97af676..a84467fb2 100644
--- a/tests/unittests/test_third_party_http_functions.py
+++ b/tests/unittests/test_third_party_http_functions.py
@@ -5,7 +5,7 @@
import pathlib
import re
import typing
-import urllib.parse
+import base64
from unittest.mock import patch
@@ -133,15 +133,18 @@ def test_raw_body_bytes(self):
image_file = parent_dir / 'unittests/resources/functions.png'
with open(image_file, 'rb') as image:
img = image.read()
- sanitized_image = urllib.parse.quote(img)
- sanitized_img_len = len(sanitized_image)
+ encoded_image = base64.b64encode(img).decode('utf-8')
+ html_img_tag = \
+ f'' # noqa
+ sanitized_img_len = len(html_img_tag)
r = self.webhost.request('POST', 'raw_body_bytes', data=img,
no_prefix=True)
received_body_len = int(r.headers['body-len'])
self.assertEqual(received_body_len, sanitized_img_len)
- body = urllib.parse.unquote_to_bytes(r.content)
+ encoded_image_data = encoded_image.split(",")[0]
+ body = base64.b64decode(encoded_image_data)
try:
received_img_file = parent_dir / 'received_img.png'
with open(received_img_file, 'wb') as received_img:
diff --git a/tests/unittests/third_party_http_functions/stein/asgi_function/function_app.py b/tests/unittests/third_party_http_functions/stein/asgi_function/function_app.py
index ab7e060dc..4b9b10689 100644
--- a/tests/unittests/third_party_http_functions/stein/asgi_function/function_app.py
+++ b/tests/unittests/third_party_http_functions/stein/asgi_function/function_app.py
@@ -150,10 +150,10 @@ async def return_http(request: Request):
@fast_app.get("/return_http_redirect")
async def return_http_redirect(request: Request, code: str = ''):
- allowed_url_pattern = r"^http://.+"
+ allowed_url_pattern = r"^http://127\.0\.0\.1:\d+/return_http_redirect\?code=*"
location = 'return_http?code={}'.format(code)
- redirect_url = f"http://{request.url.components[1]}/{location}"
+ redirect_url = f"http://127.0.0.1/{location}"
if re.match(allowed_url_pattern, redirect_url):
# Redirect URL is in the expected format
return RedirectResponse(status_code=302,
diff --git a/tests/unittests/third_party_http_functions/stein/wsgi_function/function_app.py b/tests/unittests/third_party_http_functions/stein/wsgi_function/function_app.py
index 5f1ec8e0f..e717f395d 100644
--- a/tests/unittests/third_party_http_functions/stein/wsgi_function/function_app.py
+++ b/tests/unittests/third_party_http_functions/stein/wsgi_function/function_app.py
@@ -1,7 +1,7 @@
import logging
import sys
from urllib.request import urlopen
-import urllib.parse
+import base64
import azure.functions as func
from flask import Flask, Response, redirect, request, url_for
@@ -62,8 +62,11 @@ def print_logging():
def raw_body_bytes():
body = request.get_data()
- sanitized_body = urllib.parse.quote(body)
- return Response(sanitized_body, headers={'body-len': str(len(sanitized_body))})
+ base64_encoded = base64.b64encode(body).decode('utf-8')
+ html_img_tag = \
+ f''
+
+ return Response(html_img_tag, headers={'body-len': str(len(html_img_tag))})
@flask_app.get("/return_http_no_body")
diff --git a/tests/utils/testutils_lc.py b/tests/utils/testutils_lc.py
index d0065d97f..43665a65a 100644
--- a/tests/utils/testutils_lc.py
+++ b/tests/utils/testutils_lc.py
@@ -16,9 +16,11 @@
from zipfile import ZipFile
import requests
-from Crypto.Cipher import AES
-from Crypto.Hash.SHA256 import SHA256Hash
-from Crypto.Util.Padding import pad
+from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives import padding
+
from tests.utils.constants import PROJECT_ROOT
# Linux Consumption Testing Constants
@@ -287,19 +289,35 @@ def _encrypt_context(cls, encryption_key: str, plain_text: str) -> str:
"""Encrypt plain text context into a encrypted message which can
be accepted by the host
"""
+ # Decode the encryption key
encryption_key_bytes = base64.b64decode(encryption_key.encode())
- plain_text_bytes = pad(plain_text.encode(), 16)
+
+ # Pad the plaintext to be a multiple of the AES block size
+ padder = padding.PKCS7(algorithms.AES.block_size).padder()
+ plain_text_bytes = padder.update(plain_text.encode()) + padder.finalize()
+
+ # Initialization vector (IV) (fixed value for simplicity)
iv_bytes = '0123456789abcedf'.encode()
- # Start encryption
- cipher = AES.new(encryption_key_bytes, AES.MODE_CBC, iv=iv_bytes)
- encrypted_bytes = cipher.encrypt(plain_text_bytes)
+ # Create AES cipher with CBC mode
+ cipher = Cipher(algorithms.AES(encryption_key_bytes),
+ modes.CBC(iv_bytes), backend=default_backend())
- # Prepare final result
+ # Perform encryption
+ encryptor = cipher.encryptor()
+ encrypted_bytes = encryptor.update(plain_text_bytes) + encryptor.finalize()
+
+ # Compute SHA256 hash of the encryption key
+ digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
+ digest.update(encryption_key_bytes)
+ key_sha256 = digest.finalize()
+
+ # Encode IV, encrypted message, and SHA256 hash in base64
iv_base64 = base64.b64encode(iv_bytes).decode()
encrypted_base64 = base64.b64encode(encrypted_bytes).decode()
- key_sha256 = SHA256Hash(encryption_key_bytes).digest()
key_sha256_base64 = base64.b64encode(key_sha256).decode()
+
+ # Return the final result
return f'{iv_base64}.{encrypted_base64}.{key_sha256_base64}'
def __enter__(self):