Skip to content

Fast Linelist Reporting?

Jonathan Bloedow edited this page Sep 26, 2024 · 9 revisions

Objective

We need to be able to report every single new infection event in a simulation, for some users and use cases. Each new infection record will consist of probably 4 elements:

  • Simulation Time
  • Agent ID
  • Agent Age
  • Agent Node

We'll want this written out to a file at the end of the sim.

Problem

The challenge is that this type of thing, adding rows continuously to a table/dataframe of unknown initial size, is kind of contrary to the LASER approach which seeks to make things fast first and foremost by being very clever and careful with memory management. Appending items to a list, or really a set of lists, is very easy from a programming point of view, but has a memory cost. As GPT puts is:

Continuously appending to a list that grows to 50 million entries is inefficient in memory management because Python lists are dynamically resized arrays. Each time the list exceeds its current capacity, the list needs to allocate a new, larger block of memory and copy the existing elements over. While Python tries to minimize reallocations by over-allocating space (usually growing by a factor of around 1.125x), these reallocations still result in memory fragmentation and copying overhead. Moreover, lists store references to objects, so the memory cost scales with both the list size and the objects themselves, leading to further inefficiencies when managing very large lists over time.

Another key factor is that LASER's core design consists of highly parallelizable functional code blocks that operate simultaneously on the same indices of multiple arrays. Whenever these functional units need to write to a shared memory block, such as a common report, it introduces a blocking operation that slows down execution. While we can mitigate this somewhat by using thread-local arrays—writing their contents to the 'global' array at the end of the function—this approach adds complexity and remains suboptimal. Allocating a unique section of the report array to each thread isn't feasible, unless we first calculate the number of new infections per node and then partition the report and threads by node.

Scoping the Problem

A test simulation of 100 million agents over 5 years in North Nigerian showed we might get almost 60 million new infections. Obviously this number could vary widely but it's the figure I chose to work with here to get some kind of rational scoping of extent.

Approach(es)

Properties as Reports

For this reason, it might be preferable to add some dynamic properties to our agents for the purposes of reporting. For example, we could add properties age_at_infection, time_at_infection, and node_at_infection and update these with the appropriate values when an agent is infected.

Then we write the entire population dataframe to disk at the end as an hdf5 file and the user post-processes the line-list columns as needed.

Results

This approach works and is very fast, adding at most 10 seconds to 7 minute simulation. The problem is that with Cholera, we have temporary immunity and individuals can get re-infected, so each agent could have, say, 5 (or more) infections. Do we have to have 3 columns per infection? The vast majority of those extra columns would be wasted memory. And probably a number of infections column to help us quickly use the right now, but that will also add some runtime cost.

Clever Ways to Write Many Rows Fast

We tested 3 different approaches to writing 57 million 4-column records to memory and at some point to disk. We ran these tests completely outside of LASER, in order to understand best case scenario from the data writing itself.

(All implementations provided by GPT4)

Memory-Mapped Files

The first approach was to use memory-mapped files. Using a memory-mapped file allows for faster continuous appending by avoiding costly memory reallocations, leveraging direct disk access to reduce I/O overhead, and efficiently scaling through the operating system's paging mechanism without pre-allocating all memory upfront

(imports)

# Define the record format
record_format = 'I I f I'  # Timestep (int), Individual ID (int), Age (float), Node ID (int)
record_size = struct.calcsize(record_format)

# Number of records for the demo
num_records = 57_000_000

# Create or open a memory-mapped file
file_name = 'simulation_data_large.dat'
with open(file_name, 'wb') as f:
    # Pre-allocate space for 4 million records
    f.truncate(record_size * num_records)

with open(file_name, 'r+b') as f:
    # Memory-map the file
    mmapped_file = mmap.mmap(f.fileno(), 0)

    # Function to add a new record
    def add_record(timestep, individual_id, age, node_id, position):
        record = struct.pack(record_format, timestep, individual_id, age, node_id)
        mmapped_file[position:position + record_size] = record

    # Example: write random records
    for timestep in range(num_records):
        individual_id = random.randint(0, int(1e8))
        age = random.uniform(0, 100)
        node_id = random.randint(0, 99)
        position = timestep * record_size  # Calculate position in file
        add_record(timestep, individual_id, age, node_id, position)

    # Seek back to the start of the file to read and print the first 10 records
    mmapped_file.seek(0)
    for _ in range(10):
        record = mmapped_file.read(record_size)
        unpacked = struct.unpack(record_format, record)
        print(unpacked)

    # Close the memory-mapped file
    mmapped_file.close()

This took several minutes but got down to ~2.5 minutes when using batched writing (not shown). It's not clear yet whether batched writing is relevant to our actual use case without adding too much complexity.

SQLite

The second approach was to rely on SQLite since this is a system inherently designed around solving this type of problem.

import sqlite3
import numpy as np
import time

# Number of records for the demo
num_records = 57_000_000

# Batch size for writing records
batch_size = 1_000_000

# Start timing
start_time = time.time()

# Connect to an in-memory SQLite database
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()

# Create a table to store the data
cursor.execute('''
    CREATE TABLE simulation_data (
        timestep INTEGER,
        individual_id INTEGER,
        age REAL,
        node_id INTEGER
    )
''')

# Function to insert a batch of records
def insert_batch_records(timesteps, individual_ids, ages, node_ids):
    records = list(zip(timesteps, individual_ids, ages, node_ids))
    cursor.executemany('INSERT INTO simulation_data VALUES (?, ?, ?, ?)', records)

# Loop to generate and insert records in batches
for batch_start in range(0, num_records, batch_size):
    batch_end = min(batch_start + batch_size, num_records)
    current_batch_size = batch_end - batch_start

    # Generate random data for the batch using numpy for efficiency
    timesteps = np.arange(batch_start, batch_end, dtype=np.uint32)
    individual_ids = np.random.randint(0, int(1e8), current_batch_size, dtype=np.uint32)
    ages = np.random.uniform(0, 100, current_batch_size).astype(np.float32)
    node_ids = np.random.randint(0, 100, current_batch_size, dtype=np.uint32)

    # Insert the batch into the database
    insert_batch_records(timesteps, individual_ids, ages, node_ids)

# Commit the transaction (though it's not necessary in an in-memory database)
conn.commit()

# End timing
end_time = time.time()
print(f"Time taken for 57 million records using SQLite: {end_time - start_time:.2f} seconds")

# Optional: Query the data to verify
cursor.execute('SELECT COUNT(*) FROM simulation_data')
print(f"Number of records in the table: {cursor.fetchone()[0]}")

# Close the connection
conn.close()

This ran in about 2.5 minutes also.

Virtual Memory

This approach is based on an insight from Christopher Lorton. It's actually pretty convergent with the first approach.

Reserve large amounts of memory through the virtual memory system and then only realizing the memory that we actually need. For example - reserve space for 1B reports, virtually, but then only actualize pages in memory as needed. I think we can create a "virtual" NumPy array for this. That is, NumPy supports wrapping its metadata around a user allocated chunk of memory so we can do the "allocation" and NumPy will surface the memory as an ndarray.

import numpy as np
import mmap
import os
import random

# Define the number of records
num_records = 57_000_000  # 57 million records
max_records = int(1e9)    # Allocate space for up to 1 billion records

# Define the data structure: (timestep, individual id, age, node id)
dtype = np.dtype([('timestep', 'u4'), ('individual_id', 'u4'), ('age', 'f4'), ('node_id', 'u4')])

# Calculate the total size for 1 billion records (virtual allocation)
total_size = max_records * dtype.itemsize

# Create a memory-mapped file or use an anonymous memory region
with open('/dev/zero', 'r+b') as f:
    # Create the memory-mapped region
    mem_map = mmap.mmap(f.fileno(), total_size, access=mmap.ACCESS_WRITE)

    # Create a NumPy array backed by this memory map
    arr = np.ndarray(shape=(max_records,), dtype=dtype, buffer=mem_map)

    # Fill in the first 57 million records with random data
    for i in range(num_records):
        timestep = random.randint(0, 10000)  # Random timestep
        individual_id = random.randint(0, int(1e8))  # Random individual ID
        age = random.uniform(0, 100)  # Random age between 0 and 100
        node_id = random.randint(0, 99)  # Random node ID between 0 and 99

        # Write the record to the memory-mapped array
        arr[i] = (timestep, individual_id, age, node_id)

    # You can now access `arr[:num_records]` as the first 57 million records.
    # For example, printing a few random records:
    print(arr[:10])  # Display first 10 records as a test

This also took about 2.5 minutes to complete.

Initial Observations & Preliminary Conclusions

None of the non-property approaches seem to be fast enough for LASER with the quantity of recording we'll need to do. The property approach seems preferable if we have a single infection per agent. Perhaps some hybrid approach which records the first infection in the property and reinfections in a report, which will probably be a few million records instead of tens of millions, will be the best solution?

We're also going to explore a Python/C hybrid solution. Some prototypes in pure C are very fast (mere seconds) suggesting that if we can move as much functionality as possible into a ctypes module we may get what we need.

Hybrid Python/C fixed-frame Solution

Here we use the idea of a fixed-sized ring-buffer which we flush to disk each time it gets full. Each flush is an append. We also try to do as much as possible in C.

The following all-C code which creates and writes 57 million records using the above approach in a few seconds.

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>

#define BUFFER_SIZE 524288 // 2MB (524288 * 4 bytes per record = ~2MB)
#define TOTAL_RECORDS 57000000

typedef struct {
    uint32_t agent_id;
    uint32_t age_at_infection;
    uint32_t time_at_infection;
    uint32_t node_at_infection;
} Record;

size_t buffer_index = 0;
FILE *file = fopen("output_records.bin", "wb");

void add_record( Record * buffer, int id, int age, int time, int node ) {
    // Generate random data for each field
    buffer[buffer_index].agent_id = id;
    buffer[buffer_index].age_at_infection = age;
    buffer[buffer_index].time_at_infection = time;
    buffer[buffer_index].node_at_infection = node;
    buffer_index++;

    // When buffer is full, write to file
    if (buffer_index >= BUFFER_SIZE) {
        fwrite(buffer, sizeof(Record), BUFFER_SIZE, file);
        buffer_index = 0;  // Reset buffer index
    }
}

int main() {
    Record *buffer = (Record *)malloc(BUFFER_SIZE * sizeof(Record));
    if (!buffer) {
        perror("Failed to allocate buffer");
        return 1;
    }

    for (size_t i = 0; i < TOTAL_RECORDS; i++) {
        add_record( buffer, rand() % 100000000, rand() % 100, rand() % 365, rand() % 100 );
    }

    // Write any remaining records in the buffer
    if (buffer_index > 0) {
        fwrite(buffer, sizeof(Record), buffer_index, file);
    }

    free(buffer);
    fclose(file);
    return 0;
}

So that's great and would seem to point to a way forward. So we now want to create the values in Python and pass the values in a function to a cytpes module which does the rest.

Python

import ctypes
import numpy as np

# Load the shared library
lib = ctypes.CDLL('./ll_record_writer.so')

# Define the argument types for the C functions
lib.init_writer.argtypes = [ctypes.c_char_p]
lib.write_record.argtypes = [ctypes.c_uint32, ctypes.c_uint32, ctypes.c_uint32, ctypes.c_uint32]
lib.close_writer.argtypes = []

# Initialize the writer
def generate_records_preallocated(num_records, filename):
    # Generate all random numbers ahead of time using NumPy
    agent_ids = np.random.randint(0, 1e8, size=num_records, dtype=np.uint32)
    ages_at_infection = np.random.randint(0, 101, size=num_records, dtype=np.uint32)  # ages between 0 and 100
    times_at_infection = np.random.randint(0, 366, size=num_records, dtype=np.uint32)  # time between 0 and 365
    nodes_at_infection = np.random.randint(0, 100, size=num_records, dtype=np.uint32)  # node IDs between 0 and 99

    # Initialize the writer (open file and setup buffer)
    lib.init_writer(filename.encode('utf-8'))  

    print( f"Numbers created. Looping now." )
    # Loop over the preallocated arrays and write to the file using the C function
    for i in range(num_records):
        lib.write_record(
            agent_ids[i],
            ages_at_infection[i],
            times_at_infection[i],
            nodes_at_infection[i]
        )

    # Flush buffer and close the file
    lib.close_writer()

# Define filename and number of records
filename = 'output_records_preallocated.bin'
num_records = 57000000  # 57 million records

# Call the generator function
generate_records_preallocated(num_records, filename)

print(f"Written {num_records} records to {filename}")

C

#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>

#define BUFFER_SIZE 1024 * 1024 // Size of buffer in bytes (1 MB)

static uint32_t* buffer = NULL;
static size_t buffer_count = 0;
static size_t max_buffer_count;
static FILE* file = NULL;

// Initialize the buffer and open the file
void init_writer(const char* filename) {
    file = fopen(filename, "wb");  // Open file in write-binary mode
    if (file == NULL) {
        perror("Error opening file");
        return;
    }
    
    // Allocate memory for the buffer (BUFFER_SIZE bytes)
    max_buffer_count = BUFFER_SIZE / (4 * sizeof(uint32_t));  // 4 integers per record
    buffer = (uint32_t*) malloc(BUFFER_SIZE);
    buffer_count = 0;  // Start with an empty buffer
}

// Flush the buffer to the file
void flush_buffer() {
    if (buffer_count > 0 && file != NULL) {
        fwrite(buffer, sizeof(uint32_t), buffer_count * 4, file);  // Write all buffered records
        buffer_count = 0;  // Reset the buffer
    }
}

// Write a record to the buffer
void write_record(uint32_t agent_id, uint32_t age_at_infection, uint32_t time_at_infection, uint32_t node_at_infection) {
    if (buffer_count >= max_buffer_count) {
        flush_buffer();  // If the buffer is full, write to disk
    }
    
    // Store the record in the buffer
    size_t index = buffer_count * 4;  // 4 integers per record
    buffer[index] = agent_id;
    buffer[index + 1] = age_at_infection;
    buffer[index + 2] = time_at_infection;
    buffer[index + 3] = node_at_infection;
    buffer_count++;
}

// Close the writer and flush remaining data
void close_writer() {
    if (file != NULL) {
        flush_buffer();  // Write any remaining buffered data
        fclose(file);
        file = NULL;
    }
    
    if (buffer != NULL) {
        free(buffer);  // Free the allocated buffer memory
        buffer = NULL;
    }
}

Results

This takes just over 1 minute to complete. Which is a lot better than our other solutions, but still surprisingly slower than the all-C solution. It would seem that just the act of calling the ctypes function 57 million times adds 1 minute. If we can batch these up and do 1 call each timestep, we might be close to a solution here.

Final Design & Implementation

Infected IDs Static Buffer

The infected_ids_buffer in transmission.py plays a critical role in holding infected agent IDs. It is allocated once in the Python layer as a flat C-style array with a fixed size, chosen to be no smaller than the maximum number of infections expected during any timestep. It is populated (written to) in the compiled C module for the transmission kernel. It is read back in the Python layer at the end of the transmission step and the contents are passed to the compiled C module for the linelist reporter. The design efficiently combines Python's data handling capabilities (using NumPy) with high-performance operations using a compiled C module through ctypes. The infection updates are calculated using a parallelized C function, and the results are logged in a batch reporting mechanism for later analysis.

It was a top priority to avoid runtime reallocations.

Note that we now call a function to calculate the number of new infections in each node prior to this step and the results of that is stored in new_infections array. If new_infections[0] is 10, that means the first 10 elements of new_infection_ids are set aside for node 0 to populate, and the 11th element is set aside for the first infectee id in node 1 (assuming 1 has non-zero new infections).

Non-Blocking New Infectee Notifications

We didn't want to kill the performance of our core transmission loop by having all the parallel threads block every time there was an infectee selected because the memory collecting this data (or even the index counter) was global. By modifying our core transmission loop to be "sharded" by node, we can have parallelization operate nodewise and as long as the section of our accumulator array for each node is known ahead of time, each node can be processed autonomously and in parallel.

Flushing Ring-Buffer

In the reporter we also wanted to avoid reallocating memory and expensive repeated writes to disk. A fixed-size buffer holds multiple records before flushing them to disk. Each record contains information like agent IDs, infection ages, times, and nodes. The buffer uses a ring (circular) structure, where once the buffer is full, it is flushed to disk, and new data overwrites old data starting from the beginning of the buffer. The buffer is flushed (written to disk) when it becomes full (or when the simulation ends -- this might not be true yet!). We add records in a per-timestep batch of arrays of matching data (IDs, ages, times, and nodes).

The report is implemented in a compiled C ctypes extension module which has a function:

write_records_batch( infected_ids_arr, ages_at_infection, np.ones(total_infections).astype(np.uint32) * tick, # current timestep np.repeat(np.arange(num_nodes), new_infections).astype(np.uint32), total_infections ) 
  • We populate each of those arrays from the ids returned in the new_infection_id array. This function is called once per timestep.
  • The records (infected IDs, ages, current time, and nodes) are passed in batches to a C library function (write_records_batch) for writing to the linelist (a log for tracking infections over time).
  • The batch-writing mechanism is efficient for both memory management and performance, avoiding frequent I/O operations by accumulating records in a buffer.

Design Considerations:

  1. Memory Efficiency:

    • The pre-allocated infected_ids_buffer ensures that no memory is reallocated in each timestep. This is crucial for performance when working with potentially large populations and high infection counts.
    • Converting this buffer to a NumPy array at the reporting stage avoids unnecessary data copying, allowing efficient manipulation in Python.
  2. Parallelization and Speed:

    • The core infection transmission logic is offloaded to a parallelized C function (tx_inner_nodes), which leverages OpenMP for concurrency. This reduces the computational load on the Python side and ensures fast processing even for large populations.
  3. Batch Reporting:

    • The batch reporting (write_records_batch) ensures that data is written to disk efficiently, only flushing when necessary, which reduces I/O overhead and ensures smooth simulation runs.
  4. Reusability:

    • The entire setup (especially the buffer design) is built for reusability across timesteps. The infected_ids_buffer is reused, and its contents are dynamically updated based on the current timestep's infections.

This design ensures that the infection transmission and reporting processes are both computationally efficient and memory-conscious, key factors in large-scale simulations.

Clone this wiki locally