From 3af1d8667ab14f802d640f47fc2a135a6ee727cd Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Fri, 15 Sep 2023 12:18:58 -0500 Subject: [PATCH 1/7] Create backfill_talk_comments.py --- scripts/backfill_talk_comments.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 scripts/backfill_talk_comments.py diff --git a/scripts/backfill_talk_comments.py b/scripts/backfill_talk_comments.py new file mode 100644 index 0000000..6f8a97d --- /dev/null +++ b/scripts/backfill_talk_comments.py @@ -0,0 +1,22 @@ +import os +import psycopg + +TALK_CONN = os.getenv('TALK_CONNECTION') +TALK_PORT = os.getenv('TALK_PORT') +TALK_DB = os.getenv('TALK_DB') +TALK_USER = os.getenv('TALK_USER') +TALK_PW = os.getenv('TALK_PW') +TIMESCALE_CONNECTION = os.getenv('TIMESCALE_CONNECTION') +TIMESCALE_PORT = os.getenv('TIMESCALE_PORT') +ERAS_DB = os.getenv('ERAS_DB') +ERAS_USER = os.getenv('ERAS_USER') +ERAS_PW = os.getenv('ERAS_PW') + +FIRST_INGESTED_COMMENT_ID = os.getenv('FIRST_COMMENT_ID') + + +with psycopg.connect(f'host={TALK_CONN} port={TALK_PORT} dbname={TALK_DB} user={TALK_USER} password={TALK_PW}') as talk_db_conn, psycopg.connect(f'host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW}') as timescale_db_conn: + with talk_db_conn.cursor(name="talk").copy(f'COPY (SELECT id as comment_id, created_at as event_time, updated_at as comment_updated_at, project_id, user_id, created_at, updated_at from comments where id < {FIRST_INGESTED_COMMENT_ID}) TO STDOUT (FORMAT BINARY)') as talk_copy: + with timescale_db_conn.cursor().copy("COPY comment_events FROM STDIN (FORMAT BINARY)") as timescale_copy: + for data in talk_copy: + timescale_copy.write(data) \ No newline at end of file From 3f4b759a851caae6af5af60873764d9dcede35bb Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Fri, 15 Sep 2023 15:15:33 -0500 Subject: [PATCH 2/7] backfill classifications --- scripts/backfill_classifications.py | 29 +++++++++++++++++++++++++++++ scripts/backfill_talk_comments.py | 5 ++--- 2 files changed, 31 insertions(+), 3 deletions(-) create mode 100644 scripts/backfill_classifications.py diff --git a/scripts/backfill_classifications.py b/scripts/backfill_classifications.py new file mode 100644 index 0000000..da4ef89 --- /dev/null +++ b/scripts/backfill_classifications.py @@ -0,0 +1,29 @@ +import os +import psycopg +import datetime + +PANOPTES_CONN = os.getenv('PANOPTES_CONN') +PANOPTES_PORT = os.getenv('PANOPTES_PORT') +PANOPTES_DB = os.getenv('PANOPTES_DB') +PANOPTES_USER = os.getenv('PANOPTES_USER') +PANOPTES_PW = os.getenv('PANOPTES_PW') +TIMESCALE_CONNECTION = os.getenv('TIMESCALE_CONNECTION') +TIMESCALE_PORT = os.getenv('TIMESCALE_PORT') +ERAS_DB = os.getenv('ERAS_DB') +ERAS_USER = os.getenv('ERAS_USER') +ERAS_PW = os.getenv('ERAS_PW') +FIRST_INGESTED_CLASSIFICATION_ID = os.getenv('FIRST_INGESTED_CLASSIFICATION_ID') + +now = datetime.now() +current_time = now.strftime("%H:%M:%S") +print("CLASSIFICATIONS backfill BEFORE Time =", current_time) + +with psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW}") as panoptes_db_conn, psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW}") as timescale_db_conn: + with panoptes_db_conn.cursor(name="panoptes_cursor").copy(f"COPY (select id as classification_id, created_at as event_time, updated_at as classification_updated_at, TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') as started_at, TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') as finished_at, project_id, workflow_id, user_id, string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ',')::int[] as user_group_ids, EXTRACT(EPOCH FROM TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') - TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS')) as session_time, created_at, updated_at from classifications where id < {FIRST_INGESTED_CLASSIFICATION_ID}) TO STDOUT (FORMAT BINARY)") as panoptes_copy: + with timescale_db_conn.cursor().copy("COPY classification_events FROM STDIN (FORMAT BINARY)") as timescale_copy: + for data in panoptes_copy: + timescale_copy.write(data) + + finish = datetime.now() + finish_time = finish.strftime("%H:%M:%S") + print("CLASSIFICATIONS backfill AFTER Time =", finish_time) \ No newline at end of file diff --git a/scripts/backfill_talk_comments.py b/scripts/backfill_talk_comments.py index 6f8a97d..c6dc918 100644 --- a/scripts/backfill_talk_comments.py +++ b/scripts/backfill_talk_comments.py @@ -11,12 +11,11 @@ ERAS_DB = os.getenv('ERAS_DB') ERAS_USER = os.getenv('ERAS_USER') ERAS_PW = os.getenv('ERAS_PW') - FIRST_INGESTED_COMMENT_ID = os.getenv('FIRST_COMMENT_ID') -with psycopg.connect(f'host={TALK_CONN} port={TALK_PORT} dbname={TALK_DB} user={TALK_USER} password={TALK_PW}') as talk_db_conn, psycopg.connect(f'host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW}') as timescale_db_conn: - with talk_db_conn.cursor(name="talk").copy(f'COPY (SELECT id as comment_id, created_at as event_time, updated_at as comment_updated_at, project_id, user_id, created_at, updated_at from comments where id < {FIRST_INGESTED_COMMENT_ID}) TO STDOUT (FORMAT BINARY)') as talk_copy: +with psycopg.connect(f"host={TALK_CONN} port={TALK_PORT} dbname={TALK_DB} user={TALK_USER} password={TALK_PW}") as talk_db_conn, psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW}") as timescale_db_conn: + with talk_db_conn.cursor(name="talk").copy(f"COPY (SELECT id as comment_id, created_at as event_time, updated_at as comment_updated_at, project_id, user_id, created_at, updated_at from comments where id < {FIRST_INGESTED_COMMENT_ID}) TO STDOUT (FORMAT BINARY)") as talk_copy: with timescale_db_conn.cursor().copy("COPY comment_events FROM STDIN (FORMAT BINARY)") as timescale_copy: for data in talk_copy: timescale_copy.write(data) \ No newline at end of file From 2df91392e71367f0868d731628323da473fca55a Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Fri, 15 Sep 2023 15:44:50 -0500 Subject: [PATCH 3/7] Update backfill_classifications.py --- scripts/backfill_classifications.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/backfill_classifications.py b/scripts/backfill_classifications.py index da4ef89..bfed30f 100644 --- a/scripts/backfill_classifications.py +++ b/scripts/backfill_classifications.py @@ -1,6 +1,6 @@ import os import psycopg -import datetime +from datetime import datetime PANOPTES_CONN = os.getenv('PANOPTES_CONN') PANOPTES_PORT = os.getenv('PANOPTES_PORT') From 0a5824f6d5e82ebf5985b3ca735a3b58c93132a6 Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Wed, 20 Sep 2023 11:56:36 -0500 Subject: [PATCH 4/7] add user_group_membership_backfill and update select query of backfilling talk comments and classifications --- scripts/backfill_classifications.py | 2 +- scripts/backfill_talk_comments.py | 2 +- ...roup_membership_classification_backfill.py | 68 +++++++++++++++++++ 3 files changed, 70 insertions(+), 2 deletions(-) create mode 100644 scripts/user_group_membership_classification_backfill.py diff --git a/scripts/backfill_classifications.py b/scripts/backfill_classifications.py index bfed30f..53a2ca7 100644 --- a/scripts/backfill_classifications.py +++ b/scripts/backfill_classifications.py @@ -19,7 +19,7 @@ print("CLASSIFICATIONS backfill BEFORE Time =", current_time) with psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW}") as panoptes_db_conn, psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW}") as timescale_db_conn: - with panoptes_db_conn.cursor(name="panoptes_cursor").copy(f"COPY (select id as classification_id, created_at as event_time, updated_at as classification_updated_at, TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') as started_at, TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') as finished_at, project_id, workflow_id, user_id, string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ',')::int[] as user_group_ids, EXTRACT(EPOCH FROM TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') - TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS')) as session_time, created_at, updated_at from classifications where id < {FIRST_INGESTED_CLASSIFICATION_ID}) TO STDOUT (FORMAT BINARY)") as panoptes_copy: + with panoptes_db_conn.cursor(name="panoptes_cursor").copy("COPY (select id as classification_id, created_at as event_time, updated_at as classification_updated_at, TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') as started_at, TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') as finished_at, project_id, workflow_id, user_id, string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ',')::int[] as user_group_ids, EXTRACT(EPOCH FROM TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') - TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS')) as session_time, created_at, updated_at from classifications where id < %s) TO STDOUT (FORMAT BINARY)", (FIRST_INGESTED_CLASSIFICATION_ID,)) as panoptes_copy: with timescale_db_conn.cursor().copy("COPY classification_events FROM STDIN (FORMAT BINARY)") as timescale_copy: for data in panoptes_copy: timescale_copy.write(data) diff --git a/scripts/backfill_talk_comments.py b/scripts/backfill_talk_comments.py index c6dc918..2cfb791 100644 --- a/scripts/backfill_talk_comments.py +++ b/scripts/backfill_talk_comments.py @@ -15,7 +15,7 @@ with psycopg.connect(f"host={TALK_CONN} port={TALK_PORT} dbname={TALK_DB} user={TALK_USER} password={TALK_PW}") as talk_db_conn, psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW}") as timescale_db_conn: - with talk_db_conn.cursor(name="talk").copy(f"COPY (SELECT id as comment_id, created_at as event_time, updated_at as comment_updated_at, project_id, user_id, created_at, updated_at from comments where id < {FIRST_INGESTED_COMMENT_ID}) TO STDOUT (FORMAT BINARY)") as talk_copy: + with talk_db_conn.cursor(name="talk").copy("COPY (SELECT id as comment_id, created_at as event_time, updated_at as comment_updated_at, project_id, user_id, created_at, updated_at from comments where id < %s}) TO STDOUT (FORMAT BINARY)", (FIRST_INGESTED_COMMENT_ID,)) as talk_copy: with timescale_db_conn.cursor().copy("COPY comment_events FROM STDIN (FORMAT BINARY)") as timescale_copy: for data in talk_copy: timescale_copy.write(data) \ No newline at end of file diff --git a/scripts/user_group_membership_classification_backfill.py b/scripts/user_group_membership_classification_backfill.py new file mode 100644 index 0000000..10cd783 --- /dev/null +++ b/scripts/user_group_membership_classification_backfill.py @@ -0,0 +1,68 @@ +import os +import argparse +import psycopg +from datetime import datetime + +PANOPTES_CONN = os.getenv('PANOPTES_CONN') +PANOPTES_PORT = os.getenv('PANOPTES_PORT') +PANOPTES_DB = os.getenv('PANOPTES_DB') +PANOPTES_USER = os.getenv('PANOPTES_USER') +PANOPTES_PW = os.getenv('PANOPTES_PW') +TIMESCALE_CONNECTION = os.getenv('TIMESCALE_CONNECTION') +TIMESCALE_PORT = os.getenv('TIMESCALE_PORT') +ERAS_DB = os.getenv('ERAS_DB') +ERAS_USER = os.getenv('ERAS_USER') +ERAS_PW = os.getenv('ERAS_PW') + +now = datetime.now() + +current_time = now.strftime("%H:%M:%S") +print("BEFORE Time =", current_time) + +parser = argparse.ArgumentParser() +parser.add_argument("-ug", "--user_group_id", type=int) +parser.add_argument('email_domain_formats') + +args = parser.parse_args() +user_group_id = args.user_group_id +# email formats in form of comma separated string with no spaces (eg. "%a.com,%b.org%") +email_formats = args.email_domain_formats + +panoptes_db_conn = psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW}") +panoptes_cursor = panoptes_db_conn.cursor() + +eras_conn = psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW}") +eras_cursor = eras_conn.cursor() + +# get ids of users that are not in group yet +panoptes_cursor.execute("SELECT id from users where email ILIKE ANY(STRING_TO_ARRAY(%s, ',')) AND id NOT IN (SELECT user_id from memberships where user_group_id=%s)", (email_formats, user_group_id)) + +not_in_group_yet_user_ids = [result[0] for result in panoptes_cursor.fetchall()] + +if len(not_in_group_yet_user_ids) > 0: + # create memberships to user group + memberships_to_create = list(map(lambda user_id: (user_group_id, user_id, 'active', '{"group_member"}'),not_in_group_yet_user_ids)) + panoptes_cursor.executemany("INSERT INTO memberships (user_group_id, user_id, state, roles) VALUES (%s,%s,%s,%s)", memberships_to_create) + + panoptes_db_conn.commit() + + # eras get classification_events of not_in_group_yet_user_ids that does not have user_group_id within their user_group_ids classification_event + eras_cursor.execute("SELECT classification_id, event_time, session_time, project_id, user_id, workflow_id, created_at, updated_at, user_group_ids from classification_events WHERE user_id = ANY(%s) AND %s!=ANY(user_group_ids)", (not_in_group_yet_user_ids, user_group_id)) + classification_events_to_backfill = eras_cursor.fetchall() + + # create classification_user_group + classification_user_groups = list(map(lambda classification: (classification[0:8] + (user_group_id,)), classification_events_to_backfill)) + eras_cursor.executemany("INSERT INTO classification_user_groups (classification_id, event_time, session_time, project_id, user_id, workflow_id, created_at, updated_at, user_group_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)", classification_user_groups) + + eras_conn.commit() + +panoptes_cursor.close() +panoptes_db_conn.close() +eras_cursor.close() +eras_conn.close() + + +finish = datetime.now() +finish_time = finish.strftime("%H:%M:%S") +print("AFTER Time =", finish_time) + From 64be6a4ab28827edca147fb3b7ada2922e68dc14 Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Wed, 20 Sep 2023 17:39:05 -0500 Subject: [PATCH 5/7] add update classification events --- scripts/user_group_membership_classification_backfill.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scripts/user_group_membership_classification_backfill.py b/scripts/user_group_membership_classification_backfill.py index 10cd783..c168474 100644 --- a/scripts/user_group_membership_classification_backfill.py +++ b/scripts/user_group_membership_classification_backfill.py @@ -54,6 +54,10 @@ classification_user_groups = list(map(lambda classification: (classification[0:8] + (user_group_id,)), classification_events_to_backfill)) eras_cursor.executemany("INSERT INTO classification_user_groups (classification_id, event_time, session_time, project_id, user_id, workflow_id, created_at, updated_at, user_group_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)", classification_user_groups) + # update classification_events' user_group_ids so that it includes new classification_id + classification_events_to_update = list(map(lambda classification_event: {'classification_id': classification_event[0], 'user_group_ids': ([user_group_id] if classification_event[8] is None else classification_event[8] +[user_group_id])} ,classification_events_to_backfill)) + eras_cursor.executemany("UPDATE classification_events SET user_group_ids = %(user_group_ids)s WHERE classification_id = %(classification_id)s", classification_events_to_update) + eras_conn.commit() panoptes_cursor.close() From f186f60aa84952963bae337b422cb2114bff1659 Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Thu, 21 Sep 2023 11:30:29 -0500 Subject: [PATCH 6/7] add sslmode require to connection strings --- scripts/backfill_classifications.py | 2 +- scripts/backfill_talk_comments.py | 2 +- scripts/user_group_membership_classification_backfill.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/backfill_classifications.py b/scripts/backfill_classifications.py index 53a2ca7..98d7b8d 100644 --- a/scripts/backfill_classifications.py +++ b/scripts/backfill_classifications.py @@ -18,7 +18,7 @@ current_time = now.strftime("%H:%M:%S") print("CLASSIFICATIONS backfill BEFORE Time =", current_time) -with psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW}") as panoptes_db_conn, psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW}") as timescale_db_conn: +with psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW} sslmode=require") as panoptes_db_conn, psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") as timescale_db_conn: with panoptes_db_conn.cursor(name="panoptes_cursor").copy("COPY (select id as classification_id, created_at as event_time, updated_at as classification_updated_at, TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') as started_at, TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') as finished_at, project_id, workflow_id, user_id, string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ',')::int[] as user_group_ids, EXTRACT(EPOCH FROM TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') - TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS')) as session_time, created_at, updated_at from classifications where id < %s) TO STDOUT (FORMAT BINARY)", (FIRST_INGESTED_CLASSIFICATION_ID,)) as panoptes_copy: with timescale_db_conn.cursor().copy("COPY classification_events FROM STDIN (FORMAT BINARY)") as timescale_copy: for data in panoptes_copy: diff --git a/scripts/backfill_talk_comments.py b/scripts/backfill_talk_comments.py index 2cfb791..8b48b9b 100644 --- a/scripts/backfill_talk_comments.py +++ b/scripts/backfill_talk_comments.py @@ -14,7 +14,7 @@ FIRST_INGESTED_COMMENT_ID = os.getenv('FIRST_COMMENT_ID') -with psycopg.connect(f"host={TALK_CONN} port={TALK_PORT} dbname={TALK_DB} user={TALK_USER} password={TALK_PW}") as talk_db_conn, psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW}") as timescale_db_conn: +with psycopg.connect(f"host={TALK_CONN} port={TALK_PORT} dbname={TALK_DB} user={TALK_USER} password={TALK_PW} sslmode=require") as talk_db_conn, psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") as timescale_db_conn: with talk_db_conn.cursor(name="talk").copy("COPY (SELECT id as comment_id, created_at as event_time, updated_at as comment_updated_at, project_id, user_id, created_at, updated_at from comments where id < %s}) TO STDOUT (FORMAT BINARY)", (FIRST_INGESTED_COMMENT_ID,)) as talk_copy: with timescale_db_conn.cursor().copy("COPY comment_events FROM STDIN (FORMAT BINARY)") as timescale_copy: for data in talk_copy: diff --git a/scripts/user_group_membership_classification_backfill.py b/scripts/user_group_membership_classification_backfill.py index c168474..9b26a50 100644 --- a/scripts/user_group_membership_classification_backfill.py +++ b/scripts/user_group_membership_classification_backfill.py @@ -28,10 +28,10 @@ # email formats in form of comma separated string with no spaces (eg. "%a.com,%b.org%") email_formats = args.email_domain_formats -panoptes_db_conn = psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW}") +panoptes_db_conn = psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW} sslmode=require") panoptes_cursor = panoptes_db_conn.cursor() -eras_conn = psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW}") +eras_conn = psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") eras_cursor = eras_conn.cursor() # get ids of users that are not in group yet From 8c147b8709e27b5a2dd20384b0bea6d4afb5fa37 Mon Sep 17 00:00:00 2001 From: yuenmichelle1 Date: Thu, 21 Sep 2023 14:19:35 -0500 Subject: [PATCH 7/7] casting ids to big ints and taking care of case when date comes in format MM/DD/YYYY which comes from Galaxy Zoo Touch Table --- scripts/backfill_classifications.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/backfill_classifications.py b/scripts/backfill_classifications.py index 98d7b8d..ccb210a 100644 --- a/scripts/backfill_classifications.py +++ b/scripts/backfill_classifications.py @@ -19,7 +19,7 @@ print("CLASSIFICATIONS backfill BEFORE Time =", current_time) with psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW} sslmode=require") as panoptes_db_conn, psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") as timescale_db_conn: - with panoptes_db_conn.cursor(name="panoptes_cursor").copy("COPY (select id as classification_id, created_at as event_time, updated_at as classification_updated_at, TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') as started_at, TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') as finished_at, project_id, workflow_id, user_id, string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ',')::int[] as user_group_ids, EXTRACT(EPOCH FROM TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') - TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS')) as session_time, created_at, updated_at from classifications where id < %s) TO STDOUT (FORMAT BINARY)", (FIRST_INGESTED_CLASSIFICATION_ID,)) as panoptes_copy: + with panoptes_db_conn.cursor(name="panoptes_cursor").copy("COPY (select id::bigint as classification_id, created_at as event_time, updated_at as classification_updated_at, CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END started_at, CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END finished_at, project_id::bigint, workflow_id::bigint, user_id::bigint, string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ',')::int[] as user_group_ids, EXTRACT(EPOCH FROM (CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END) - (CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END)) as session_time, created_at, updated_at from classifications where id < %s order by id) TO STDOUT (FORMAT BINARY)", (FIRST_INGESTED_CLASSIFICATION_ID,)) as panoptes_copy: with timescale_db_conn.cursor().copy("COPY classification_events FROM STDIN (FORMAT BINARY)") as timescale_copy: for data in panoptes_copy: timescale_copy.write(data)