Skip to content

Commit

Permalink
Fixes ossf#519: code examples use Jekyll
Browse files Browse the repository at this point in the history
Signed-off-by: emcdtho <[email protected]>
  • Loading branch information
tommcd committed Jun 5, 2024
1 parent 355128a commit b5aad25
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 939 deletions.
77 changes: 4 additions & 73 deletions docs/Secure-Coding-Guide-for-Python/CWE-664/CWE-400/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,8 @@ Tasks can be submitted to the ThreadPoolExecutor by calling `submit()`. Submitte

[*noncompliant01.py:*](noncompliant01.py)

```py
""" Non-compliant Code Example """
import time
from concurrent.futures import ThreadPoolExecutor


def take_time(x):
print(f"Started Task: {x}")
# Simulate work
for i in range(10):
time.sleep(1)
print(f"Completed Task: {x}")


def run_thread(_executor, var):
future = _executor.submit(take_time, var)
return future


def interrupt(future):
print(future.cancel())
print(f"Interrupted: {future}")


#####################
# Exploiting above code example
#####################


with ThreadPoolExecutor() as executor:
task = run_thread(executor, "A")
interrupt(task)

```python
{% include_relative noncompliant01.py %}
```

## Compliant Solution
Expand All @@ -49,46 +18,8 @@ Tasks submitted to the ThreadPoolExecutor can be interrupted by setting a thread

[*compliant01.py:*](compliant01.py)

```py
""" Compliant Code Example """
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Event


def take_time(x, _event):
print(f"Started Task: {x}")
# Simulate work
for _ in range(10):
if _event.is_set():
print(f"Interrupted Task: {x}")
# Save partial results
return
time.sleep(1)
print(f"Completed Task: {x}")


def run_thread(_executor, var):
e = Event()
future = _executor.submit(take_time, var, e)
return future, e


def interrupt(future, e):
"""Cancel the task, just in case it is not yet running, and set the Event flag"""
future.cancel()
e.set()


#####################
# Exploiting above code example
#####################


with ThreadPoolExecutor() as executor:
task, event = run_thread(executor, "A")
interrupt(task, event)

```python
{% include_relative compliant01.py %}
```

## Related Guidelines
Expand Down
243 changes: 8 additions & 235 deletions docs/Secure-Coding-Guide-for-Python/CWE-664/CWE-410/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,55 +14,8 @@ The `noncompliant01.py` code example demonstrates the Thread-Per-Message design

*[noncompliant01.py](noncompliant01.py):*

```py
""" Non-compliant Code Example """
import logging
import threading
import time

logging.basicConfig(level=logging.INFO)


def process_message(message: str, processed_messages: list):
""" Method simulating mediation layer i/o heavy work"""
logging.debug("process_message: started message %s working %is", message, int(message) / 10)
for _ in range(int(message)):
time.sleep(0.01)
logging.debug("process_message: completed message %s", message)
processed_messages.append(f"processed {message}")


class MessageAPI(object):
"""Class simulating the front end facing API"""

def add_messages(self, messages: list) -> list:
""" Receives a list of messages to work on """
logging.info("add_messages: got %i messages to process", len(messages))
processed_messages = []
threads = []
for message in messages:
threads.append(
threading.Thread(target=process_message, args=[message, processed_messages]))
threads[-1].start()
logging.debug("add_messages: submitted %i messages", len(messages))
for thread in threads:
thread.join()
logging.info("add_messages: messages_done=%i", len(processed_messages))
return processed_messages


#####################
# exploiting above code example
#####################
mapi = MessageAPI()
attacker_messages = [str(msg) for msg in range(1000)]
print("ATTACKER: start sending messages")
result_list = mapi.add_messages(attacker_messages)
print(
f"ATTACKER: done sending {len(attacker_messages)} messages, got {len(result_list)} messages "
f"back")
print(f"ATTACKER: result_list = {result_list}")

```python
{% include_relative noncompliant01.py %}
```

The `noncompliant01.py` code creates a risk of having a single component exhaust all available hardware resources even when used by a single user for a single use-case. When running the code, the Unix `top` command can show a significant increase in CPU usage (%CPU exceeds 100% because multiple cores are being used):
Expand All @@ -80,64 +33,8 @@ The attacker could still flood the server by creating multiple MessageAPI, each

*[compliant01.py](compliant01.py)*

```py
""" Compliant Code Example """
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait

logging.basicConfig(level=logging.INFO)


def process_message(message: str):
""" Method simulating mediation layer i/o heavy work"""
logging.debug("process_message: started message %s working %is", message, int(message) / 10)
for _ in range(int(message)):
time.sleep(0.01)
logging.debug("process_message: completed message %s", message)
return f"processed {message}"


class MessageAPI(object):
"""Class simulating the front end facing API"""
# TODO: Prevent the attacker from creating multiple MessageAPI objects

def __init__(self):
# TODO: set or handle timeout as it is provided by the mediation layer
self.timeout = 1
self.executor = ThreadPoolExecutor()

def add_messages(self, messages: list) -> list:
""" Receives a list of messages to work on """
# TODO: limit on max messages from the mediation layer.
# TODO: input sanitation.
futures = []
# with self.executor:
for message in messages:
futures.append(self.executor.submit(process_message, message))
logging.debug("add_messages: submitted %i messages, waiting for %is to complete.", len(messages), self.timeout)
messages_done, messages_not_done = wait(futures, timeout=self.timeout)
for future in messages_not_done:
future.cancel()

logging.info("add_messages: messages_done=%i messages_not_done=%i", len(messages_done), len(messages_not_done))
process_messages = []
for future in messages_done:
process_messages.append(future.result())
return process_messages


#####################
# exploiting above code example
#####################
mapi = MessageAPI()
result_list = []
attacker_messages = [str(msg) for msg in range(100)]
print("ATTACKER: start sending messages")
result_list = mapi.add_messages(attacker_messages)
print(f"ATTACKER: done sending {len(attacker_messages)} messages, got {len(result_list)} messages back")
print(f"ATTACKER: result_list = {result_list}")
```python
{% include_relative compliant01.py %}
```

Now, after the timeout is reached, `MessageAPI` drops unprocessed messages and returns partial results:
Expand All @@ -155,63 +52,8 @@ The `executor.shutdown()` method has a `cancel_futures` parameter, which by def

*[noncompliant02.py](noncompliant02.py):*

```py
""" Non-compliant Code Example """
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait

logging.basicConfig(level=logging.INFO)


def process_message(message: str):
""" Method simulating mediation layer i/o heavy work"""
logging.debug("process_message: started message %s working %is", message, int(message) / 10)
for _ in range(int(message)):
time.sleep(0.01)
logging.debug("process_message: completed message %s", message)
return f"processed {message}"


class MessageAPI(object):
"""Class simulating the front end facing API"""

def __init__(self):
self.executor = ThreadPoolExecutor()
self.timeout = 1

def add_messages(self, messages: list) -> list:
""" Receives a list of messages to work on """
futures = []
for message in messages:
futures.append(self.executor.submit(process_message, message))

logging.debug("add_messages: submitted %i messages, waiting for %is to complete.",
len(messages), self.timeout)
messages_done, messages_not_done = wait(futures, timeout=self.timeout)

logging.info("add_messages: messages_done=%i messages_not_done=%i", len(messages_done),
len(messages_not_done))

process_messages = []
for future in messages_done:
process_messages.append(future.result())
return process_messages


#####################
# exploiting above code example
#####################
mapi = MessageAPI()
result_list = []
attacker_messages = [str(msg) for msg in range(1000)]
print("ATTACKER: start sending messages")
result_list = mapi.add_messages(attacker_messages)
print(
f"ATTACKER: done sending {len(attacker_messages)} messages, got {len(result_list)} messages "
f"back")
print(f"ATTACKER: result_list = {result_list}")
```python
{% include_relative noncompliant02.py %}
```

## Compliant Solution (Thread Pool with grace period)
Expand All @@ -220,77 +62,8 @@ The `compliant01.py` can be expanded by adding a grace period. Before dropping t

*[compliant02.py](compliant02.py):*

```py
""" Compliant Code Example """
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait

logging.basicConfig(level=logging.INFO)


def process_message(message: str):
""" Method simulating mediation layer i/o heavy work"""
logging.debug("process_message: started message %s working %is", message, int(message) / 10)
for _ in range(int(message)):
time.sleep(0.01)
logging.debug("process_message: completed message %s", message)
return f"processed {message}"


class MessageAPI(object):
"""Class simulating the front end facing API"""
# TODO: Prevent the attacker from creating multiple MessageAPI objects

def __init__(self):
# TODO: set or handle timeout as it is provided by the mediation layer
self.timeout = 1
self.executor = ThreadPoolExecutor()

def add_messages(self, messages: list) -> list:
""" Receives a list of messages to work on """
# TODO: limit on max messages from the mediation layer.
# TODO: input sanitation.
futures = []
# with self.executor:
for message in messages:
futures.append(self.executor.submit(process_message, message))
logging.debug("add_messages: submitted %i messages, waiting for %is to complete.",
len(messages), self.timeout)
messages_done, messages_not_done = wait(futures, timeout=self.timeout)
logging.info("add_messages: messages_done=%i messages_not_done=%i", len(messages_done),
len(messages_not_done))
if len(messages_not_done) > 0:
# TODO: be graceful, warn a trusted client
logging.warning("add_messages: %i messages taking longer than %is, %i more to process",
len(messages), self.timeout, len(messages_not_done))
messages_done, messages_not_done = wait(futures, timeout=self.timeout)
logging.info("add_messages: messages_done=%i messages_not_done=%i", len(messages_done),
len(messages_not_done))
for future in messages_not_done:
future.cancel()

logging.info("add_messages: messages_done=%i messages_not_done=%i", len(messages_done),
len(messages_not_done))
process_messages = []
for future in messages_done:
process_messages.append(future.result())
return process_messages


#####################
# exploiting above code example
#####################
mapi = MessageAPI()
result_list = []
attacker_messages = [str(msg) for msg in range(100)]
print("ATTACKER: start sending messages")
result_list = mapi.add_messages(attacker_messages)
print(
f"ATTACKER: done sending {len(attacker_messages)} messages, got {len(result_list)} messages "
f"back")
print(f"ATTACKER: result_list = {result_list}")
```python
{% include_relative compliant02.py %}
```

## Automated Detection
Expand Down
Loading

0 comments on commit b5aad25

Please sign in to comment.