Skip to content

Commit

Permalink
Adds dispersed File I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
tainagdcoleman committed Jan 10, 2025
1 parent b21a058 commit a8804eb
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 61 deletions.
57 changes: 26 additions & 31 deletions bin/cpu-benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <random>
#include <cmath>
#include <functional>
#include <iomanip>

#define PRECISION 100000L

Expand All @@ -21,16 +22,20 @@ void compute_good_pi(long work) {
rng.seed((long)&work);

double good_pi = 0.0;
double x,y;
for (long sample=0; sample < work; sample++) {
double x, y;
for (long sample = 0; sample < work; sample++) {
x = random_dist(rng);
y = random_dist(rng);
good_pi += (double)(std::sqrt(x*x + y*y) < 0.5);
}
// std::cout << "good pi = " << (good_pi/(double)work)/(0.5*0.5) << "\n";

}
good_pi += (double)(std::sqrt(x * x + y * y) < 0.5);

// Print progress every 1% of completion
if (sample % (work / 100) == 0) {
double progress = (double)sample / work * 100;
std::cout << "\rProgress: " << std::fixed << std::setprecision(2) << progress << "%" << std::flush;
}
}
std::cout << "\rProgress: 100.00%\n"; // Ensure full progress is displayed
}

/**
* This function computes pi using work trials for a Monte Carlo method. It's
Expand All @@ -44,51 +49,41 @@ void compute_terrible_pi(long work) {
long rng = (long)&work;
double terrible_pi = 0.0;
double x_value, y_value;
for (long sample=0; sample < work; sample++) {
for (long sample = 0; sample < work; sample++) {
rng = (((rng * 214013L + 2531011L) >> 16) & 32767);
x_value = -0.5 + (rng % PRECISION) / (double)PRECISION;
rng = (((rng * 214013L + 2531011L) >> 16) & 32767);
y_value = -0.5 + (rng % PRECISION) / (double)PRECISION;
terrible_pi += (double)(std::sqrt(x_value*x_value+ y_value*y_value) < 0.5);
terrible_pi += (double)(std::sqrt(x_value * x_value + y_value * y_value) < 0.5);

// Print progress every 1% of completion
if (sample % (work / 100) == 0) {
double progress = (double)sample / work * 100;
std::cout << "\rProgress: " << std::fixed << std::setprecision(2) << progress << "%" << std::flush;
}
}
// std::cout << "terrible pi = " << (terrible_pi/(double)work)/ (0.5*0.5) << "\n";
std::cout << "\rProgress: 100.00%\n"; // Ensure full progress is displayed
}

int main(int argc, char **argv) {


// Process command-line args
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <work (# 1M samples)>\n";
exit(1);
}

long work;
int num_threads;
try {
work = std::stol(argv[1]);
// num_threads = std::stoi(argv[2]);
work = std::stol(argv[1]);
} catch (std::invalid_argument &e) {
std::cerr << "Invalid argument: " << e.what() << "\n";
exit(1);
}

// Create all threads
// std::vector<std::thread> workers;
// for (unsigned int i = 0; i < num_threads; i++) {
// auto t = std::thread([work]() {
compute_terrible_pi(1000000*work);
std::cout<<"Pi computed!"<<std::endl;
//compute_good_pi(1000000*work);
// });
// workers.push_back(std::move(t));
// }

// Wait for all threads
// for (unsigned int i = 0; i < num_threads; i++) {
// workers.at(i).join();
// }
// Compute Pi using terrible method
compute_terrible_pi(1000000 * work);
std::cout << "Pi computation completed!\n";

exit(0);

}
146 changes: 116 additions & 30 deletions bin/wfbench
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import time
import sys
import signal
import multiprocessing
import queue
import argparse
import re
import json
Expand Down Expand Up @@ -48,7 +49,8 @@ def lock_core(path_locked: pathlib.Path,
try:
lock.acquire()
taken_cores = {
int(line) for line in path_cores.read_text().splitlines() if line.strip()}
int(line) for line in path_cores.read_text().splitlines() if line.strip()
}
available = all_cores - taken_cores
if available:
core = available.pop()
Expand Down Expand Up @@ -87,63 +89,87 @@ def unlock_core(path_locked: pathlib.Path,
lock.release()


def cpu_mem_benchmark(cpu_threads: Optional[int] = 5,
import os
import subprocess
import multiprocessing
from typing import Optional, List

def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue,
cpu_threads: Optional[int] = 5,
mem_threads: Optional[int] = 5,
cpu_work: Optional[int] = 100,
core: Optional[int] = None,
total_mem: Optional[float] = None) -> List:
"""
Run cpu and memory benchmark.
Run CPU and memory benchmark.
:param cpu_threads:
:param cpu_queue: Queue to push CPU benchmark progress as a float.
:type cpu_queue: multiprocessing.Queue
:param cpu_threads: Number of threads for CPU benchmark.
:type cpu_threads: Optional[int]
:param mem_threads:
:param mem_threads: Number of threads for memory benchmark.
:type mem_threads: Optional[int]
:param cpu_work:
:param cpu_work: Total work units for CPU benchmark.
:type cpu_work: Optional[int]
:param core:
:param core: Core to pin the benchmark processes to.
:type core: Optional[int]
:param total_mem:
:param total_mem: Total memory to use for memory benchmark.
:type total_mem: Optional[float]
:return:
:return: Lists of CPU and memory subprocesses.
:rtype: List
"""
total_mem = f"{total_mem}M" if total_mem else f"{100.0 / os.cpu_count()}%"
cpu_work_per_thread = int(cpu_work / cpu_threads)

cpu_procs = []
mem_procs = []
cpu_prog = [
f"{this_dir.joinpath('cpu-benchmark')}", f"{cpu_work_per_thread}"]
cpu_prog = [f"{os.path.join(os.getcwd(), 'cpu-benchmark')}", f"{cpu_work_per_thread}"]
mem_prog = ["stress-ng", "--vm", f"{mem_threads}",
"--vm-bytes", f"{total_mem}", "--vm-keep"]

def monitor_progress(proc):
"""Monitor progress from the CPU benchmark process."""
for line in iter(proc.stdout.readline, ""): # No decode needed
line = line.strip()
if line.startswith("Progress:"):
try:
progress = float(line.split()[1].strip('%'))
cpu_queue.put(progress)
except (ValueError, IndexError):
pass

for i in range(cpu_threads):
cpu_proc = subprocess.Popen(cpu_prog)
cpu_proc = subprocess.Popen(cpu_prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if core:
os.sched_setaffinity(cpu_proc.pid, {core})
cpu_procs.append(cpu_proc)

# Start a thread to monitor the progress of each CPU benchmark process
monitor_thread = multiprocessing.Process(target=monitor_progress, args=(cpu_proc,))
monitor_thread.start()

if mem_threads > 0:
mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid)
mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid)
if core:
os.sched_setaffinity(mem_proc.pid, {core})
mem_procs.append(mem_proc)

return cpu_procs, mem_procs



def io_read_benchmark_user_input_data_size(inputs,
rundir=None,
memory_limit=None):
if memory_limit is None:
memory_limit = -1
memory_limit = int(memory_limit)
print("[WfBench] Starting IO Read Benchmark...")
for file in inputs:
for file, size in inputs.items():
with open(rundir.joinpath(file), "rb") as fp:
print(f"[WfBench] Reading '{file}'")
while fp.read(memory_limit):
while fp.read(size):
pass
print("[WfBench] Completed IO Read Benchmark!\n")

Expand All @@ -162,35 +188,91 @@ def io_write_benchmark_user_input_data_size(outputs,
chunk_size = min(file_size_todo, memory_limit)
file_size_todo -= fp.write(os.urandom(int(chunk_size)))

def io_alternate(inputs, outputs, memory_limit=None, rundir=None, event=None):

def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit=None, rundir=None, event=None):
"""Alternate between reading and writing to a file, ensuring read only happens after write."""

if memory_limit is None:
memory_limit = 10 * 1024 * 1024 # sys.maxsize
memory_limit = int(memory_limit)

while True:
# Wait for the write to complete before reading
event.wait()

# Read from the file
io_read_benchmark_user_input_data_size(inputs, rundir, memory_limit=memory_limit)
# while True:
# # Wait for the write to complete before reading
# event.wait()

# # Read from the file
# io_read_benchmark_user_input_data_size(inputs, rundir, memory_limit=memory_limit)

# # Clear the event to block the next read until write is done
# event.clear()

# # Write to the file and then set the event to signal completion
# io_write_benchmark_user_input_data_size(outputs, rundir, memory_limit=memory_limit)
# event.set()

# queue will have messages in the form (cpu_percent_completed)
# Get the last message and trash the rest
io_completed = 0
bytes_read = {
name: 0
for name in inputs
}
bytes_written = {
name: 0
for name in outputs
}

# get size of inputs
inputs = {
name: os.path.getsize(rundir.joinpath(name))
for name in inputs
}

while io_completed < 100:
cpu_percent = max(io_completed, cpu_queue.get())
while True: # Get the last message
try:
cpu_percent = max(io_completed, cpu_queue.get_nowait())
except queue.Empty:
break

print(f"CPU Percent: {cpu_percent}")
if cpu_percent:
bytes_to_read = {
name: int(size * (cpu_percent / 100) - bytes_read[name])
for name, size in inputs.items()
}
bytes_to_write = {
name: int(size * (cpu_percent / 100) - bytes_written[name])
for name, size in outputs.items()
}
io_read_benchmark_user_input_data_size(bytes_to_read, rundir, memory_limit=memory_limit)
io_write_benchmark_user_input_data_size(bytes_to_write, rundir, memory_limit=memory_limit)

# Clear the event to block the next read until write is done
event.clear()
bytes_read = {
name: bytes_read[name] + bytes_to_read[name]
for name in bytes_to_read
}
bytes_written = {
name: bytes_written[name] + bytes_to_write[name]
for name in bytes_to_write
}

# Write to the file and then set the event to signal completion
io_write_benchmark_user_input_data_size(outputs, rundir, memory_limit=memory_limit)
event.set()
print(f"Bytes Read: {bytes_read}")
print(f"Bytes Written: {bytes_written}")

io_completed = cpu_percent

if io_completed >= 100:
break

def get_available_gpus():
proc = subprocess.Popen(["nvidia-smi", "--query-gpu=utilization.gpu", "--format=csv"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, _ = proc.communicate()
df = pd.read_csv(StringIO(stdout.decode("utf-8")), sep=" ")
return df[df["utilization.gpu"] <= 5].index.to_list()


def gpu_benchmark(time: int = 100,
work: int = 100,
device: int = 0): #work, device
Expand All @@ -199,6 +281,7 @@ def gpu_benchmark(time: int = 100,
print(f"Running GPU Benchmark: {gpu_prog}")
subprocess.Popen(gpu_prog, shell=True)


def get_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
parser.add_argument("name", help="Task Name")
Expand Down Expand Up @@ -241,12 +324,15 @@ def main():
io_proc = None
outputs_dict = {}

cpu_queue = multiprocessing.Queue()

print("Directory:", os.getcwd())
if args.out:
print("[WfBench] Starting IO benchmark...")

# Remove all escape characters before attempting to parse the JSON string
cleaned_output = re.sub(r'\\+', '', args.out)
print("CLEANED", cleaned_output)

# Attempt to parse the cleaned string
try:
Expand All @@ -267,11 +353,11 @@ def main():

io_proc = multiprocessing.Process(
target=io_alternate,
args=(other, outputs_dict, mem_bytes, rundir, write_done_event))
args=(other, outputs_dict, cpu_queue, mem_bytes, rundir, write_done_event)
)
io_proc.start()
procs.append(io_proc)


if args.gpu_work:
print("[WfBench] Starting GPU Benchmark...")
available_gpus = get_available_gpus() #checking for available GPUs
Expand All @@ -295,7 +381,7 @@ def main():
print(f"[WfBench] {args.name} acquired core {core}")

mem_threads=int(10 - 10 * args.percent_cpu)
cpu_procs, mem_procs = cpu_mem_benchmark(cpu_threads=int(10 * args.percent_cpu),
cpu_procs, mem_procs = cpu_mem_benchmark(cpu_queue=cpu_queue, cpu_threads=int(10 * args.percent_cpu),
mem_threads=mem_threads,
cpu_work=sys.maxsize if args.time else int(args.cpu_work),
core=core,
Expand Down

0 comments on commit a8804eb

Please sign in to comment.