Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding backfill scripts #33

Merged
merged 9 commits into from
Sep 21, 2023
2 changes: 1 addition & 1 deletion scripts/backfill_classifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
print("CLASSIFICATIONS backfill BEFORE Time =", current_time)

with psycopg.connect(f"host={PANOPTES_CONN} port={PANOPTES_PORT} dbname={PANOPTES_DB} user={PANOPTES_USER} password={PANOPTES_PW} sslmode=require") as panoptes_db_conn, psycopg.connect(f"host={TIMESCALE_CONNECTION} port={TIMESCALE_PORT} dbname={ERAS_DB} user={ERAS_USER} password={ERAS_PW} sslmode=require") as timescale_db_conn:
with panoptes_db_conn.cursor(name="panoptes_cursor").copy("COPY (select id as classification_id, created_at as event_time, updated_at as classification_updated_at, TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') as started_at, TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') as finished_at, project_id, workflow_id, user_id, string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ',')::int[] as user_group_ids, EXTRACT(EPOCH FROM TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') - TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS')) as session_time, created_at, updated_at from classifications where id < %s) TO STDOUT (FORMAT BINARY)", (FIRST_INGESTED_CLASSIFICATION_ID,)) as panoptes_copy:
with panoptes_db_conn.cursor(name="panoptes_cursor").copy("COPY (select id::bigint as classification_id, created_at as event_time, updated_at as classification_updated_at, CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END started_at, CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END finished_at, project_id::bigint, workflow_id::bigint, user_id::bigint, string_to_array(replace(replace(replace(metadata ->> 'user_group_ids', '[', ''), ']', ''), ' ', '' ), ',')::int[] as user_group_ids, EXTRACT(EPOCH FROM (CASE WHEN metadata ->> 'finished_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'finished_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'finished_at', 'YYYY-MM-DD HH24:MI:SS') END) - (CASE WHEN metadata ->> 'started_at' ~'\d{1,2}\/\d{1,2}\/\d{2,4}' THEN to_timestamp(metadata ->> 'started_at', 'MM/DD/YYYY HH24:MI') ELSE TO_TIMESTAMP(metadata ->> 'started_at', 'YYYY-MM-DD HH24:MI:SS') END)) as session_time, created_at, updated_at from classifications where id < %s order by id) TO STDOUT (FORMAT BINARY)", (FIRST_INGESTED_CLASSIFICATION_ID,)) as panoptes_copy:
with timescale_db_conn.cursor().copy("COPY classification_events FROM STDIN (FORMAT BINARY)") as timescale_copy:
for data in panoptes_copy:
timescale_copy.write(data)
Expand Down