From a83d0f09abc6da2bd0fcdb3976a325d107e72b53 Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Mon, 16 Oct 2023 12:48:44 -0500 Subject: [PATCH 1/7] update classifications backfill script to chunk and save in file --- scripts/backfill_classifications.py | 18 +++--- ...backfill_classifications_chunk_in_files.py | 57 +++++++++++++++++++ 2 files changed, 67 insertions(+), 8 deletions(-) create mode 100644 scripts/backfill_classifications_chunk_in_files.py diff --git a/scripts/backfill_classifications.py b/scripts/backfill_classifications.py index e8a8342..a5b03fb 100644 --- a/scripts/backfill_classifications.py +++ b/scripts/backfill_classifications.py @@ -14,16 +14,18 @@ ERAS_PW = os.getenv('ERAS_PW') FIRST_INGESTED_CLASSIFICATION_ID = os.getenv('FIRST_INGESTED_CLASSIFICATION_ID') -now = datetime.now() -current_time = now.strftime("%H:%M:%S") -print("CLASSIFICATIONS backfill BEFORE Time =", current_time) +start_time = datetime.now() +print("CLASSIFICATIONS backfill BEFORE Time =", start_time) + +classifications_query = "select id::bigint as classification_id, created_at as event_time, updated_at as classification_updated_at, CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END started_at, CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END finished_at, project_id::bigint, workflow_id::bigint, user_id::bigint, array_remove(string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ','), 'null')::bigint[] as user_group_ids, EXTRACT(EPOCH FROM (CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END) - (CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END)) as session_time, created_at, updated_at from classifications where id < %s order by id desc limit 5000000" + with psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW} sslmode=require") as panoptes_db_conn, psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") as timescale_db_conn: - with panoptes_db_conn.cursor(name="panoptes_cursor").copy("COPY (select id::bigint as classification_id, created_at as event_time, updated_at as classification_updated_at, CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END started_at, CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END finished_at, project_id::bigint, workflow_id::bigint, user_id::bigint, string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ',')::bigint[] as user_group_ids, EXTRACT(EPOCH FROM (CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END) - (CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END)) as session_time, created_at, updated_at from classifications where id < %s order by id) TO STDOUT (FORMAT BINARY)", (FIRST_INGESTED_CLASSIFICATION_ID,)) as panoptes_copy: - with timescale_db_conn.cursor().copy("COPY classification_events FROM STDIN (FORMAT BINARY)") as timescale_copy: + with panoptes_db_conn.cursor(name="panoptes_cursor").copy(f"COPY ({classifications_query}) TO STDOUT (FORMAT BINARY)", (FIRST_INGESTED_CLASSIFICATION_ID,)) as panoptes_copy: + with timescale_db_conn.cursor(name="timescale_cursor").copy("COPY classification_events FROM STDIN (FORMAT BINARY)") as timescale_copy: for data in panoptes_copy: timescale_copy.write(data) - finish = datetime.now() - finish_time = finish.strftime("%H:%M:%S") - print("CLASSIFICATIONS backfill AFTER Time =", finish_time) \ No newline at end of file + +finish_time = datetime.now() +print("CLASSIFICATIONS backfill AFTER Time =", finish_time) \ No newline at end of file diff --git a/scripts/backfill_classifications_chunk_in_files.py b/scripts/backfill_classifications_chunk_in_files.py new file mode 100644 index 0000000..2b05738 --- /dev/null +++ b/scripts/backfill_classifications_chunk_in_files.py @@ -0,0 +1,57 @@ +import os +import psycopg +from datetime import datetime +import math + +PANOPTES_CONN = os.getenv('PANOPTES_CONN') +PANOPTES_PORT = os.getenv('PANOPTES_PORT') +PANOPTES_DB = os.getenv('PANOPTES_DB') +PANOPTES_USER = os.getenv('PANOPTES_USER') +PANOPTES_PW = os.getenv('PANOPTES_PW') +TIMESCALE_CONNECTION = os.getenv('TIMESCALE_CONNECTION') +TIMESCALE_PORT = os.getenv('TIMESCALE_PORT') +ERAS_DB = os.getenv('ERAS_DB') +ERAS_USER = os.getenv('ERAS_USER') +ERAS_PW = os.getenv('ERAS_PW') +FIRST_INGESTED_CLASSIFICATION_ID = os.getenv('FIRST_INGESTED_CLASSIFICATION_ID') + +start_time = datetime.now() +print("CLASSIFICATIONS backfill BEFORE Time =", start_time) + +offset = 0 +limit = 10000000 + +classifications_query = "select id::bigint as classification_id, created_at as event_time, updated_at as classification_updated_at, CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END started_at, CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END finished_at, project_id::bigint, workflow_id::bigint, user_id::bigint, array_remove(string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ','), 'null')::bigint[] as user_group_ids, EXTRACT(EPOCH FROM (CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END) - (CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END)) as session_time, created_at, updated_at from classifications where id < %s order by id desc limit %s" + +num_files = math.ceil(FIRST_INGESTED_CLASSIFICATION_ID/limit) + +while offset <= num_files: + query = '' + + if offset == 0: + query = f"{classifications_query} limit {limit}" + else: + query = f"{classifications_query} limit {limit} offset {limit * offset}" + + with psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW} sslmode=require") as panoptes_db_conn: + with open(f"prod_classifications_{offset}.csv", "wb") as f: + with panoptes_db_conn.cursor(name="panoptes_cursor").copy(f"COPY ({classifications_query}) TO STDOUT WITH CSV HEADER", (FIRST_INGESTED_CLASSIFICATION_ID, limit)) as panoptes_copy: + for data in panoptes_copy: + f.write(data) + offset+= 1 + +finish_copy_time = datetime.now() +print("PANOPTES Classification Copy over at ", finish_copy_time) +print("TIMESCALE START COPY FROM CSV") + +output_file_no = 0 +while output_file_no < num_files: + with psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") as timescale_db_conn: + with timescale_db_conn.cursor(name="timescale_cursor").copy("COPY classification_events FROM STDIN (format csv, delimiter '|', quote '\"')") as timescale_copy: + timescale_copy.write(open(f"prod_classifications_{output_file_no}.csv").read()) + output_file_no += 1 + +finish_time = datetime.now() +print("CLASSIFICATIONS TIMESCALE backfill AFTER Time =", finish_time) + + From 16df964d70fd20146b5f2fae463177b5de82120d Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Mon, 16 Oct 2023 16:50:04 -0500 Subject: [PATCH 2/7] update to <= remove unused limit in query --- scripts/backfill_classifications_chunk_in_files.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/scripts/backfill_classifications_chunk_in_files.py b/scripts/backfill_classifications_chunk_in_files.py index 2b05738..5abbfdf 100644 --- a/scripts/backfill_classifications_chunk_in_files.py +++ b/scripts/backfill_classifications_chunk_in_files.py @@ -18,11 +18,10 @@ start_time = datetime.now() print("CLASSIFICATIONS backfill BEFORE Time =", start_time) +classifications_query = "select id::bigint as classification_id, created_at as event_time, updated_at as classification_updated_at, CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END started_at, CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END finished_at, project_id::bigint, workflow_id::bigint, user_id::bigint, array_remove(string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ','), 'null')::bigint[] as user_group_ids, EXTRACT(EPOCH FROM (CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END) - (CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END)) as session_time, created_at, updated_at from classifications where id < %s order by id desc" + offset = 0 limit = 10000000 - -classifications_query = "select id::bigint as classification_id, created_at as event_time, updated_at as classification_updated_at, CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END started_at, CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END finished_at, project_id::bigint, workflow_id::bigint, user_id::bigint, array_remove(string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ','), 'null')::bigint[] as user_group_ids, EXTRACT(EPOCH FROM (CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END) - (CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END)) as session_time, created_at, updated_at from classifications where id < %s order by id desc limit %s" - num_files = math.ceil(FIRST_INGESTED_CLASSIFICATION_ID/limit) while offset <= num_files: @@ -35,7 +34,7 @@ with psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW} sslmode=require") as panoptes_db_conn: with open(f"prod_classifications_{offset}.csv", "wb") as f: - with panoptes_db_conn.cursor(name="panoptes_cursor").copy(f"COPY ({classifications_query}) TO STDOUT WITH CSV HEADER", (FIRST_INGESTED_CLASSIFICATION_ID, limit)) as panoptes_copy: + with panoptes_db_conn.cursor(name="panoptes_cursor").copy(f"COPY ({classifications_query}) TO STDOUT WITH CSV HEADER", (FIRST_INGESTED_CLASSIFICATION_ID,)) as panoptes_copy: for data in panoptes_copy: f.write(data) offset+= 1 @@ -45,7 +44,7 @@ print("TIMESCALE START COPY FROM CSV") output_file_no = 0 -while output_file_no < num_files: +while output_file_no <= num_files: with psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") as timescale_db_conn: with timescale_db_conn.cursor(name="timescale_cursor").copy("COPY classification_events FROM STDIN (format csv, delimiter '|', quote '\"')") as timescale_copy: timescale_copy.write(open(f"prod_classifications_{output_file_no}.csv").read()) From 73cfae10e7c13956a37d75c087fb21100589c212 Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Tue, 17 Oct 2023 10:28:37 -0500 Subject: [PATCH 3/7] update copy into source --- scripts/backfill_classifications_chunk_in_files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/backfill_classifications_chunk_in_files.py b/scripts/backfill_classifications_chunk_in_files.py index 5abbfdf..2458572 100644 --- a/scripts/backfill_classifications_chunk_in_files.py +++ b/scripts/backfill_classifications_chunk_in_files.py @@ -46,7 +46,7 @@ output_file_no = 0 while output_file_no <= num_files: with psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") as timescale_db_conn: - with timescale_db_conn.cursor(name="timescale_cursor").copy("COPY classification_events FROM STDIN (format csv, delimiter '|', quote '\"')") as timescale_copy: + with timescale_db_conn.cursor(name="timescale_cursor").copy("COPY classification_events FROM STDIN DELIMITER ',' CSV HEADER") as timescale_copy: timescale_copy.write(open(f"prod_classifications_{output_file_no}.csv").read()) output_file_no += 1 From f8fa8e312aea56368e6db8727dfb72ec81492623 Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Tue, 17 Oct 2023 14:07:55 -0500 Subject: [PATCH 4/7] split backfill to file creation then copy from files --- scripts/copy_classifications_from_files.py | 27 +++++++++++++++++++ ...=> save_classifications_chunk_in_files.py} | 16 ----------- 2 files changed, 27 insertions(+), 16 deletions(-) create mode 100644 scripts/copy_classifications_from_files.py rename scripts/{backfill_classifications_chunk_in_files.py => save_classifications_chunk_in_files.py} (76%) diff --git a/scripts/copy_classifications_from_files.py b/scripts/copy_classifications_from_files.py new file mode 100644 index 0000000..46818bb --- /dev/null +++ b/scripts/copy_classifications_from_files.py @@ -0,0 +1,27 @@ +import os +import psycopg +from datetime import datetime +import math + +TIMESCALE_CONNECTION = os.getenv('TIMESCALE_CONNECTION') +TIMESCALE_PORT = os.getenv('TIMESCALE_PORT') +ERAS_DB = os.getenv('ERAS_DB') +ERAS_USER = os.getenv('ERAS_USER') +ERAS_PW = os.getenv('ERAS_PW') +FIRST_INGESTED_CLASSIFICATION_ID = os.getenv('FIRST_INGESTED_CLASSIFICATION_ID') + +limit = 10000000 +num_files = math.ceil(FIRST_INGESTED_CLASSIFICATION_ID/limit) + +start_time = datetime.now() +print("TIMESCALE START COPY FROM CSV BEFORE TIME =", start_time) + +output_file_no = 0 +while output_file_no <= num_files: + with psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") as timescale_db_conn: + with timescale_db_conn.cursor(name="timescale_cursor").copy("COPY classification_events FROM STDIN DELIMITER ',' CSV HEADER") as timescale_copy: + timescale_copy.write(open(f"prod_classifications_{output_file_no}.csv").read()) + output_file_no += 1 + +finish_time = datetime.now() +print("CLASSIFICATIONS TIMESCALE backfill AFTER Time =", finish_time) \ No newline at end of file diff --git a/scripts/backfill_classifications_chunk_in_files.py b/scripts/save_classifications_chunk_in_files.py similarity index 76% rename from scripts/backfill_classifications_chunk_in_files.py rename to scripts/save_classifications_chunk_in_files.py index 2458572..575cd25 100644 --- a/scripts/backfill_classifications_chunk_in_files.py +++ b/scripts/save_classifications_chunk_in_files.py @@ -8,11 +8,6 @@ PANOPTES_DB = os.getenv('PANOPTES_DB') PANOPTES_USER = os.getenv('PANOPTES_USER') PANOPTES_PW = os.getenv('PANOPTES_PW') -TIMESCALE_CONNECTION = os.getenv('TIMESCALE_CONNECTION') -TIMESCALE_PORT = os.getenv('TIMESCALE_PORT') -ERAS_DB = os.getenv('ERAS_DB') -ERAS_USER = os.getenv('ERAS_USER') -ERAS_PW = os.getenv('ERAS_PW') FIRST_INGESTED_CLASSIFICATION_ID = os.getenv('FIRST_INGESTED_CLASSIFICATION_ID') start_time = datetime.now() @@ -41,16 +36,5 @@ finish_copy_time = datetime.now() print("PANOPTES Classification Copy over at ", finish_copy_time) -print("TIMESCALE START COPY FROM CSV") - -output_file_no = 0 -while output_file_no <= num_files: - with psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") as timescale_db_conn: - with timescale_db_conn.cursor(name="timescale_cursor").copy("COPY classification_events FROM STDIN DELIMITER ',' CSV HEADER") as timescale_copy: - timescale_copy.write(open(f"prod_classifications_{output_file_no}.csv").read()) - output_file_no += 1 - -finish_time = datetime.now() -print("CLASSIFICATIONS TIMESCALE backfill AFTER Time =", finish_time) From 782d043f4dc0b6dc3ab2ca6b26098acc12859930 Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Tue, 17 Oct 2023 19:41:57 -0500 Subject: [PATCH 5/7] cast to int --- scripts/copy_classifications_from_files.py | 2 +- scripts/save_classifications_chunk_in_files.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/copy_classifications_from_files.py b/scripts/copy_classifications_from_files.py index 46818bb..06e68b6 100644 --- a/scripts/copy_classifications_from_files.py +++ b/scripts/copy_classifications_from_files.py @@ -11,7 +11,7 @@ FIRST_INGESTED_CLASSIFICATION_ID = os.getenv('FIRST_INGESTED_CLASSIFICATION_ID') limit = 10000000 -num_files = math.ceil(FIRST_INGESTED_CLASSIFICATION_ID/limit) +num_files = math.ceil(int(FIRST_INGESTED_CLASSIFICATION_ID)/limit) start_time = datetime.now() print("TIMESCALE START COPY FROM CSV BEFORE TIME =", start_time) diff --git a/scripts/save_classifications_chunk_in_files.py b/scripts/save_classifications_chunk_in_files.py index 575cd25..03ea6a3 100644 --- a/scripts/save_classifications_chunk_in_files.py +++ b/scripts/save_classifications_chunk_in_files.py @@ -16,8 +16,8 @@ classifications_query = "select id::bigint as classification_id, created_at as event_time, updated_at as classification_updated_at, CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END started_at, CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END finished_at, project_id::bigint, workflow_id::bigint, user_id::bigint, array_remove(string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ','), 'null')::bigint[] as user_group_ids, EXTRACT(EPOCH FROM (CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END) - (CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END)) as session_time, created_at, updated_at from classifications where id < %s order by id desc" offset = 0 -limit = 10000000 -num_files = math.ceil(FIRST_INGESTED_CLASSIFICATION_ID/limit) +limit = 10,000,000 +num_files = math.ceil(int(FIRST_INGESTED_CLASSIFICATION_ID)/limit) while offset <= num_files: query = '' From 7ac7786672dad5a20f39fc0ae35930f886e540b3 Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Tue, 17 Oct 2023 19:44:35 -0500 Subject: [PATCH 6/7] revert accidental adding commas on limit --- scripts/save_classifications_chunk_in_files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/save_classifications_chunk_in_files.py b/scripts/save_classifications_chunk_in_files.py index 03ea6a3..1ee7c81 100644 --- a/scripts/save_classifications_chunk_in_files.py +++ b/scripts/save_classifications_chunk_in_files.py @@ -16,7 +16,7 @@ classifications_query = "select id::bigint as classification_id, created_at as event_time, updated_at as classification_updated_at, CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END started_at, CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END finished_at, project_id::bigint, workflow_id::bigint, user_id::bigint, array_remove(string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ','), 'null')::bigint[] as user_group_ids, EXTRACT(EPOCH FROM (CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END) - (CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END)) as session_time, created_at, updated_at from classifications where id < %s order by id desc" offset = 0 -limit = 10,000,000 +limit = 10000000 num_files = math.ceil(int(FIRST_INGESTED_CLASSIFICATION_ID)/limit) while offset <= num_files: From 522a0c7e6525ae074796bd427beecc0513f45a61 Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Wed, 18 Oct 2023 10:28:20 -0500 Subject: [PATCH 7/7] add keepalives to hopefully ensure connection does not get lost --- scripts/copy_classifications_from_files.py | 2 +- scripts/save_classifications_chunk_in_files.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/copy_classifications_from_files.py b/scripts/copy_classifications_from_files.py index 06e68b6..1a32079 100644 --- a/scripts/copy_classifications_from_files.py +++ b/scripts/copy_classifications_from_files.py @@ -18,7 +18,7 @@ output_file_no = 0 while output_file_no <= num_files: - with psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") as timescale_db_conn: + with psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require keepalives=1 keepalives_idle=30 keepalives_interval=10 keepalives_count=20") as timescale_db_conn: with timescale_db_conn.cursor(name="timescale_cursor").copy("COPY classification_events FROM STDIN DELIMITER ',' CSV HEADER") as timescale_copy: timescale_copy.write(open(f"prod_classifications_{output_file_no}.csv").read()) output_file_no += 1 diff --git a/scripts/save_classifications_chunk_in_files.py b/scripts/save_classifications_chunk_in_files.py index 1ee7c81..72adc66 100644 --- a/scripts/save_classifications_chunk_in_files.py +++ b/scripts/save_classifications_chunk_in_files.py @@ -27,7 +27,7 @@ else: query = f"{classifications_query} limit {limit} offset {limit * offset}" - with psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW} sslmode=require") as panoptes_db_conn: + with psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW} sslmode=require keepalives=1 keepalives_idle=30 keepalives_interval=10 keepalives_count=20") as panoptes_db_conn: with open(f"prod_classifications_{offset}.csv", "wb") as f: with panoptes_db_conn.cursor(name="panoptes_cursor").copy(f"COPY ({classifications_query}) TO STDOUT WITH CSV HEADER", (FIRST_INGESTED_CLASSIFICATION_ID,)) as panoptes_copy: for data in panoptes_copy: