Skip to content

Commit

Permalink
CodeQL fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Victoria Hall committed Nov 19, 2024
1 parent 139af01 commit 9bea1e8
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 20 deletions.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ dev = [
"flask",
"fastapi~=0.103.2",
"pydantic",
"pycryptodome==3.*",
"flake8==5.*; python_version == '3.7'",
"flake8==6.*; python_version >= '3.8'",
"mypy",
Expand All @@ -69,7 +68,8 @@ dev = [
"pandas",
"numpy",
"pre-commit",
"invoke"
"invoke",
"cryptography"
]
test-http-v2 = [
"azurefunctions-extensions-http-fastapi",
Expand Down
11 changes: 7 additions & 4 deletions tests/unittests/test_third_party_http_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pathlib
import re
import typing
import urllib.parse
import base64

from unittest.mock import patch

Expand Down Expand Up @@ -133,15 +133,18 @@ def test_raw_body_bytes(self):
image_file = parent_dir / 'unittests/resources/functions.png'
with open(image_file, 'rb') as image:
img = image.read()
sanitized_image = urllib.parse.quote(img)
sanitized_img_len = len(sanitized_image)
encoded_image = base64.b64encode(img).decode('utf-8')
html_img_tag = \
f'<img src="data:image/png;base64,{encoded_image}" alt="PNG Image"/>' # noqa
sanitized_img_len = len(html_img_tag)
r = self.webhost.request('POST', 'raw_body_bytes', data=img,
no_prefix=True)

received_body_len = int(r.headers['body-len'])
self.assertEqual(received_body_len, sanitized_img_len)

body = urllib.parse.unquote_to_bytes(r.content)
encoded_image_data = encoded_image.split(",")[0]
body = base64.b64decode(encoded_image_data)
try:
received_img_file = parent_dir / 'received_img.png'
with open(received_img_file, 'wb') as received_img:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ async def return_http(request: Request):

@fast_app.get("/return_http_redirect")
async def return_http_redirect(request: Request, code: str = ''):
allowed_url_pattern = r"^http://.+"
allowed_url_pattern = r"^http://127\.0\.0\.1:\d+/return_http_redirect\?code=*"

location = 'return_http?code={}'.format(code)
redirect_url = f"http://{request.url.components[1]}/{location}"
redirect_url = f"http://127.0.0.1/{location}"
if re.match(allowed_url_pattern, redirect_url):
# Redirect URL is in the expected format
return RedirectResponse(status_code=302,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import sys
from urllib.request import urlopen
import urllib.parse
import base64

import azure.functions as func
from flask import Flask, Response, redirect, request, url_for
Expand Down Expand Up @@ -62,8 +62,11 @@ def print_logging():
def raw_body_bytes():
body = request.get_data()

sanitized_body = urllib.parse.quote(body)
return Response(sanitized_body, headers={'body-len': str(len(sanitized_body))})
base64_encoded = base64.b64encode(body).decode('utf-8')
html_img_tag = \
f'<img src="data:image/png;base64,{base64_encoded}" alt="PNG Image"/>'

return Response(html_img_tag, headers={'body-len': str(len(html_img_tag))})


@flask_app.get("/return_http_no_body")
Expand Down
36 changes: 27 additions & 9 deletions tests/utils/testutils_lc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
from zipfile import ZipFile

import requests
from Crypto.Cipher import AES
from Crypto.Hash.SHA256 import SHA256Hash
from Crypto.Util.Padding import pad
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import padding

from tests.utils.constants import PROJECT_ROOT

# Linux Consumption Testing Constants
Expand Down Expand Up @@ -287,19 +289,35 @@ def _encrypt_context(cls, encryption_key: str, plain_text: str) -> str:
"""Encrypt plain text context into a encrypted message which can
be accepted by the host
"""
# Decode the encryption key
encryption_key_bytes = base64.b64decode(encryption_key.encode())
plain_text_bytes = pad(plain_text.encode(), 16)

# Pad the plaintext to be a multiple of the AES block size
padder = padding.PKCS7(algorithms.AES.block_size).padder()
plain_text_bytes = padder.update(plain_text.encode()) + padder.finalize()

# Initialization vector (IV) (fixed value for simplicity)
iv_bytes = '0123456789abcedf'.encode()

# Start encryption
cipher = AES.new(encryption_key_bytes, AES.MODE_CBC, iv=iv_bytes)
encrypted_bytes = cipher.encrypt(plain_text_bytes)
# Create AES cipher with CBC mode
cipher = Cipher(algorithms.AES(encryption_key_bytes),
modes.CBC(iv_bytes), backend=default_backend())

# Prepare final result
# Perform encryption
encryptor = cipher.encryptor()
encrypted_bytes = encryptor.update(plain_text_bytes) + encryptor.finalize()

# Compute SHA256 hash of the encryption key
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(encryption_key_bytes)
key_sha256 = digest.finalize()

# Encode IV, encrypted message, and SHA256 hash in base64
iv_base64 = base64.b64encode(iv_bytes).decode()
encrypted_base64 = base64.b64encode(encrypted_bytes).decode()
key_sha256 = SHA256Hash(encryption_key_bytes).digest()
key_sha256_base64 = base64.b64encode(key_sha256).decode()

# Return the final result
return f'{iv_base64}.{encrypted_base64}.{key_sha256_base64}'

def __enter__(self):
Expand Down

0 comments on commit 9bea1e8

Please sign in to comment.