From faabc55b802996c4b7fe3e209c3369fe899f0781 Mon Sep 17 00:00:00 2001 From: Harshad Hegde Date: Tue, 13 Aug 2024 13:19:25 -0500 Subject: [PATCH] replace db in memory with persistent db --- kg_microbe_merge/constants.py | 1 + kg_microbe_merge/utils/duckdb_utils.py | 240 ++++++++++++------------- 2 files changed, 121 insertions(+), 120 deletions(-) diff --git a/kg_microbe_merge/constants.py b/kg_microbe_merge/constants.py index ce6a771..0d3a48a 100644 --- a/kg_microbe_merge/constants.py +++ b/kg_microbe_merge/constants.py @@ -28,3 +28,4 @@ RAW_DATA_DIR = DATA_DIR / "raw" MERGED_DATA_DIR = DATA_DIR / "merged" MERGED_GRAPH_STATS_FILE = MERGED_DATA_DIR / "merged_graph_stats.yaml" +TMP_DIR = DATA_DIR / "duckdb_temp" diff --git a/kg_microbe_merge/utils/duckdb_utils.py b/kg_microbe_merge/utils/duckdb_utils.py index 378d126..6ddcfac 100644 --- a/kg_microbe_merge/utils/duckdb_utils.py +++ b/kg_microbe_merge/utils/duckdb_utils.py @@ -5,6 +5,8 @@ import duckdb +from kg_microbe_merge.constants import TMP_DIR + def get_table_count(con, table): """Get the number of rows of a given duckdb table name.""" @@ -265,19 +267,6 @@ def duckdb_nodes_merge(nodes_file_list, output_file, priority_sources, batch_siz """ Merge nodes files using DuckDB with batching for large datasets. - :param nodes_file_list: List of paths to nodes files. - :param output_file: Path to the output file. - :param priority_sources: List of source names to prioritize. - """ - # Create a DuckDB connection - conn = duckdb.connect("nodes.db") - - # Load the files into DuckDB - load_into_duckdb(conn, nodes_file_list, "combined_nodes") - - priority_sources_str = ", ".join(f"''{source}''" for source in priority_sources) - - """ Construct the query to merge the nodes This query performs the following operations: @@ -309,7 +298,19 @@ def duckdb_nodes_merge(nodes_file_list, output_file, priority_sources, batch_siz - Reducing memory usage by processing subsets of data at a time. - Maintaining the same aggregation logic as the original query. - Ensuring consistent output formatting across all batches. + + :param nodes_file_list: List of paths to nodes files. + :param output_file: Path to the output file. + :param priority_sources: List of source names to prioritize. """ + # Create a DuckDB connection + conn = duckdb.connect("nodes.db") + + # Load the files into DuckDB + load_into_duckdb(conn, nodes_file_list, "combined_nodes") + + priority_sources_str = ", ".join(f"''{source}''" for source in priority_sources) + try: # Construct the query to get columns and their aggregation expressions query = f""" @@ -381,132 +382,131 @@ def duckdb_nodes_merge(nodes_file_list, output_file, priority_sources, batch_siz os.remove("nodes.db") -def duckdb_edges_merge(edges_file_list, output_file, batch_size=2000000): +def duckdb_edges_merge(edges_file_list, output_file, batch_size=1000000): """ - Merge edges files using DuckDB. + Merge edges files using DuckDB with a disk-based approach for improved memory efficiency. :param edges_file_list: List of paths to edges files. :param output_file: Path to the output file. - """ - # Create a DuckDB connection - conn = duckdb.connect("edges.db") + :param batch_size: Number of edges to process in each batch. - # Load the files into DuckDB, excluding the 'id' column - load_into_duckdb(conn, edges_file_list, "combined_edges", exclude_columns=["id"]) + Detailed Explanation: - """ - Detailed Explanation: - - 1. Column Information Retrieval: - - The function retrieves column names from the combined_edges table using SQL. - - This information is used to dynamically construct the aggregation query. - - 2. Query Construction: - - Aggregation expressions are built for each column: - - For 'subject', 'predicate', and 'object', they are kept as is with a 'ce.' prefix. - - For other columns, a string_agg expression is created to concatenate distinct values. - - These expressions are joined into a single string for use in the SQL query. - - 3. Batch Processing: - - The total number of edges is determined. - - Edges are processed in batches to handle large datasets efficiently. - - For each batch: - - A batch-specific query is constructed using a CTE (Common Table Expression). - - The batch query selects distinct edges and joins them with the full dataset. - - Results are grouped and ordered by subject, predicate, and object. - - 4. Data Writing: - - For the first batch, the query is printed for debugging and results are written to the output file. - - For subsequent batches, results are appended to the output file. - - Progress is printed after each batch. - - 5. Error Handling: - - Any DuckDB errors are caught and reported, along with the generated query for debugging. - - 6. Resource Management: - - The database connection is properly closed in the 'finally' block. - - The temporary database file is removed after processing. - - This approach allows for efficient processing of large edge datasets by using batch processing - and constructing the query dynamically based on the table structure. It handles potential memory - issues by processing data in manageable chunks. - """ + 1. Initial Setup: + - The function connects to a persistent DuckDB database on disk. + - It loads the edge files into a table using a memory-mapped approach. - try: - # Get column names - columns = conn.execute( - "SELECT column_name FROM information_schema.columns WHERE table_name = 'combined_edges'" - ).fetchall() + 2. Temporary Table Creation: + - A temporary table is created with distinct subject, predicate, and object combinations. + - This table is stored on disk using a memory-mapped file. - # Construct aggregation expressions - # Construct aggregation expressions - agg_expressions = [] - for column in columns: - column_name = column[0] - if column_name in ("subject", "predicate", "object"): - agg_expressions.append(f"ce.{column_name}") - else: - agg_expressions.append( - f"string_agg(DISTINCT ce.{column_name}, '|' ORDER BY ce.{column_name}) AS {column_name}" - ) + 3. Batched Column-wise Processing: + - For each non-key column (not subject, predicate, or object): + - The column is added to the temporary table. + - An UPDATE statement aggregates the distinct values for each edge in batches. - # Join expressions into a single string - agg_expressions_str = ", ".join(agg_expressions) + 4. Result Writing: + - The final results are written to the output file directly from the temporary table. - # Construct the final query - query = f""" - SELECT {agg_expressions_str} - FROM combined_edges - GROUP BY subject, predicate, object - ORDER BY subject, predicate, object - """ + 5. Error Handling and Cleanup: + - Any DuckDB errors are caught and reported. + - The database connection is closed and temporary files are removed. - # Get total number of unique edges - total_edges = conn.execute("SELECT COUNT(*) FROM combined_edges").fetchone()[0] + This approach processes data in batches and uses disk storage, which significantly reduces + memory usage and allows for processing of very large datasets that exceed available RAM. + """ + os.makedirs(TMP_DIR, exist_ok=True) + db_file = "edges_persistent.db" + conn = duckdb.connect(db_file) - # Process in batches - for offset in range(0, total_edges, batch_size): - batch_query = f""" - WITH batch_edges AS ( - SELECT DISTINCT subject, predicate, object - FROM combined_edges - ORDER BY subject, predicate, object - LIMIT {batch_size} OFFSET {offset} - ) - SELECT {agg_expressions_str} - FROM combined_edges ce - INNER JOIN batch_edges be - ON ce.subject = be.subject - AND ce.predicate = be.predicate - AND ce.object = be.object - GROUP BY ce.subject, ce.predicate, ce.object - ORDER BY ce.subject, ce.predicate, ce.object - """ + try: + # Enable memory-mapped storage for temporary tables + conn.execute(f"PRAGMA temp_directory='{TMP_DIR}'") # Store temp files in the same directory + conn.execute("PRAGMA memory_limit='4GB'") # Adjust based on available system memory - # Print the generated SQL for debugging - if offset == 0: - print("Generated SQL query (for first batch):") - print(batch_query) - conn.execute(f"COPY ({batch_query}) TO '{output_file}' (HEADER, DELIMITER '\t')") - else: - batch_data = conn.execute(batch_query).fetch_df() - batch_data.to_csv(output_file, mode="a", sep="\t", header=False, index=False) + # Load the files into DuckDB, excluding the 'id' column + load_into_duckdb(conn, edges_file_list, "combined_edges", exclude_columns=["id"]) - print(f"Written {min(offset + batch_size, total_edges)} / {total_edges} edges") + # Get column names + columns = conn.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name = 'combined_edges'" + ).fetchall() + column_names = [col[0] for col in columns] - # Print the generated SQL for debugging - print("Generated SQL query:") - print(batch_query) + # Create a temporary table for storing intermediate results + conn.execute( + """ + CREATE TABLE temp_edges AS + SELECT DISTINCT subject, predicate, object + FROM combined_edges + """ + ) - # Execute the final query and save the result - conn.execute(f"COPY ({batch_query}) TO '{output_file}' (HEADER, DELIMITER '\t')") + # Process non-key columns in batches + for column in column_names: + if column not in ("subject", "predicate", "object"): + conn.execute(f"ALTER TABLE temp_edges ADD COLUMN {column} STRING") + + # Process in batches + offset = 0 + while True: + batch_update = conn.execute( + f""" + UPDATE temp_edges + SET {column} = ( + SELECT string_agg(DISTINCT ce.{column}, '|' ORDER BY ce.{column}) + FROM ( + SELECT subject, predicate, object, {column} + FROM combined_edges + LIMIT {batch_size} OFFSET {offset} + ) ce + WHERE ce.subject = temp_edges.subject + AND ce.predicate = temp_edges.predicate + AND ce.object = temp_edges.object + ) + WHERE temp_edges.rowid IN ( + SELECT rowid + FROM temp_edges + LIMIT {batch_size} OFFSET {offset} + ) + """ + ) + + if batch_update.fetchone()[0] == 0: + break + + offset += batch_size + print(f"Processed {offset} rows for column {column}") + + # Write results to file in batches + with open(output_file, "w") as f: + # Write header + header = conn.execute("SELECT * FROM temp_edges LIMIT 0").fetchdf().columns.tolist() + f.write("\t".join(header) + "\n") + + # Write data in batches + offset = 0 + while True: + batch = conn.execute( + f""" + SELECT * + FROM temp_edges + ORDER BY subject, predicate, object + LIMIT {batch_size} OFFSET {offset} + """ + ).fetchdf() + + if batch.empty: + break + + batch.to_csv(f, sep="\t", header=False, index=False, mode="a") + offset += batch_size + print(f"Written {offset} rows to output file") print(f"Merged file has been created as '{output_file}'") + except duckdb.Error as e: print(f"An error occurred: {e}") - print("Generated query was:") - print(query) finally: - # Close the connection conn.close() - os.remove("edges.db") + os.remove(db_file)