From 28e3c7579733387f98b3984b67db20d580f9c6bd Mon Sep 17 00:00:00 2001 From: Guillaume Hivert Date: Tue, 3 Sep 2024 01:14:29 +0200 Subject: [PATCH] feat: add timeseries computation --- ...0902224247_create_analytics_timeseries.sql | 10 +++++ ...lter_default_value_in_search_analytics.sql | 7 +++ apps/backend/db/schema.sql | 25 ++++++++++- apps/backend/src/api/github.gleam | 2 +- .../src/api/github/stargazer_count.gleam | 6 +-- apps/backend/src/api/hex.gleam | 38 ++++++++-------- apps/backend/src/backend.gleam | 45 ++++++++++--------- .../src/backend/postgres/queries.gleam | 30 +++++++++++++ apps/backend/src/tasks/hex.gleam | 10 ++--- apps/backend/src/tasks/timeseries.gleam | 21 +++++++++ 10 files changed, 142 insertions(+), 52 deletions(-) create mode 100644 apps/backend/db/migrations/20240902224247_create_analytics_timeseries.sql create mode 100644 apps/backend/db/migrations/20240902225236_alter_default_value_in_search_analytics.sql create mode 100644 apps/backend/src/tasks/timeseries.gleam diff --git a/apps/backend/db/migrations/20240902224247_create_analytics_timeseries.sql b/apps/backend/db/migrations/20240902224247_create_analytics_timeseries.sql new file mode 100644 index 0000000..97de598 --- /dev/null +++ b/apps/backend/db/migrations/20240902224247_create_analytics_timeseries.sql @@ -0,0 +1,10 @@ +-- migrate:up +create table analytics_timeseries ( + query text not null, + occurences int not null, + date timestamptz not null, + primary key (query, date) +); + +-- migrate:down +drop table analytics_timeseries; diff --git a/apps/backend/db/migrations/20240902225236_alter_default_value_in_search_analytics.sql b/apps/backend/db/migrations/20240902225236_alter_default_value_in_search_analytics.sql new file mode 100644 index 0000000..49216fc --- /dev/null +++ b/apps/backend/db/migrations/20240902225236_alter_default_value_in_search_analytics.sql @@ -0,0 +1,7 @@ +-- migrate:up +alter table only search_analytics alter column occurences set default 1; +update search_analytics set occurences = occurences + 1; + +-- migrate:down +alter table only search_analytics alter column occurences set default 0; +update search_analytics set occurences = occurences - 1; diff --git a/apps/backend/db/schema.sql b/apps/backend/db/schema.sql index 73ef95e..f767afe 100644 --- a/apps/backend/db/schema.sql +++ b/apps/backend/db/schema.sql @@ -96,6 +96,17 @@ ALTER TABLE public.analytics ALTER COLUMN id ADD GENERATED ALWAYS AS IDENTITY ( ); +-- +-- Name: analytics_timeseries; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.analytics_timeseries ( + query text NOT NULL, + occurences integer NOT NULL, + date timestamp with time zone NOT NULL +); + + -- -- Name: hex_read; Type: TABLE; Schema: public; Owner: - -- @@ -304,7 +315,7 @@ CREATE TABLE public.schema_migrations ( CREATE TABLE public.search_analytics ( query text NOT NULL, - occurences integer DEFAULT 0 NOT NULL, + occurences integer DEFAULT 1 NOT NULL, created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL, updated_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL ); @@ -326,6 +337,14 @@ ALTER TABLE ONLY public.analytics ADD CONSTRAINT analytics_pkey PRIMARY KEY (id); +-- +-- Name: analytics_timeseries analytics_timeseries_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.analytics_timeseries + ADD CONSTRAINT analytics_timeseries_pkey PRIMARY KEY (query, date); + + -- -- Name: hex_read hex_read_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -590,4 +609,6 @@ INSERT INTO public.schema_migrations (version) VALUES ('20240521204341'), ('20240801164720'), ('20240801211520'), - ('20240801220817'); + ('20240801220817'), + ('20240902224247'), + ('20240902225236'); diff --git a/apps/backend/src/api/github.gleam b/apps/backend/src/api/github.gleam index becbc3c..a00f0d8 100644 --- a/apps/backend/src/api/github.gleam +++ b/apps/backend/src/api/github.gleam @@ -25,7 +25,7 @@ fn query( use response <- result.try( request.new() |> request.set_header("authorization", "Bearer " <> token) - |> request.set_header("user-agent", "gloogle / 0.0.0") + |> request.set_header("user-agent", "gloogle / 1.0.0") |> request.set_method(http.Post) |> request.set_scheme(http.Https) |> request.set_host("api.github.com") diff --git a/apps/backend/src/api/github/stargazer_count.gleam b/apps/backend/src/api/github/stargazer_count.gleam index 9cfc58d..dd2abb0 100644 --- a/apps/backend/src/api/github/stargazer_count.gleam +++ b/apps/backend/src/api/github/stargazer_count.gleam @@ -15,7 +15,7 @@ pub fn decoder(dyn) { } pub fn variables(name: String, owner: String) { - Some( - json.object([#("name", json.string(name)), #("owner", json.string(owner))]), - ) + let name = json.string(name) + let owner = json.string(owner) + Some(json.object([#("name", name), #("owner", owner)])) } diff --git a/apps/backend/src/api/hex.gleam b/apps/backend/src/api/hex.gleam index 75766ce..937c84e 100644 --- a/apps/backend/src/api/hex.gleam +++ b/apps/backend/src/api/hex.gleam @@ -9,15 +9,15 @@ import gleam/result import gleam/uri pub fn get_package_owners(package_name: String, secret hex_api_key: String) { - use response <- result.try( + use response <- result.try({ request.new() |> request.set_host("hex.pm") |> request.set_path("/api/packages/" <> package_name <> "/owners") |> request.prepend_header("authorization", hex_api_key) - |> request.prepend_header("user-agent", "gloogle / 0.0.0") + |> request.prepend_header("user-agent", "gloogle / 1.0.0") |> httpc.send() - |> result.map_error(error.FetchError), - ) + |> result.map_error(error.FetchError) + }) response.body |> json.decode(using: dynamic.list(decode_hex_owner)) @@ -25,15 +25,15 @@ pub fn get_package_owners(package_name: String, secret hex_api_key: String) { } pub fn get_package(package_name: String, secret hex_api_key: String) { - use response <- result.try( + use response <- result.try({ request.new() |> request.set_host("hex.pm") |> request.set_path("/api/packages/" <> package_name) |> request.prepend_header("authorization", hex_api_key) - |> request.prepend_header("user-agent", "gloogle / 0.0.0") + |> request.prepend_header("user-agent", "gloogle / 1.0.0") |> httpc.send() - |> result.map_error(error.FetchError), - ) + |> result.map_error(error.FetchError) + }) response.body |> json.decode(using: hexpm.decode_package) @@ -52,15 +52,15 @@ fn decode_hex_owner(data) { pub fn lookup_release(release: hexpm.PackageRelease, secret hex_api_key: String) { let assert Ok(url) = uri.parse(release.url) - use response <- result.try( + use response <- result.try({ request.new() |> request.set_host("hex.pm") |> request.set_path(url.path) |> request.prepend_header("authorization", hex_api_key) - |> request.prepend_header("user-agent", "gloogle / 0.0.0") + |> request.prepend_header("user-agent", "gloogle / 1.0.0") |> httpc.send() - |> result.map_error(error.FetchError), - ) + |> result.map_error(error.FetchError) + }) response.body |> json.decode(using: hexpm.decode_release) @@ -68,19 +68,17 @@ pub fn lookup_release(release: hexpm.PackageRelease, secret hex_api_key: String) } pub fn get_api_packages_page(page: Int, hex_api_key: String) { - use response <- result.try( + let page = int.to_string(page) + use response <- result.try({ request.new() |> request.set_host("hex.pm") |> request.set_path("/api/packages") - |> request.set_query([ - #("sort", "updated_at"), - #("page", int.to_string(page)), - ]) + |> request.set_query([#("sort", "updated_at"), #("page", page)]) |> request.prepend_header("authorization", hex_api_key) - |> request.prepend_header("user-agent", "gloogle / 0.0.0") + |> request.prepend_header("user-agent", "gloogle / 1.0.0") |> httpc.send() - |> result.map_error(error.FetchError), - ) + |> result.map_error(error.FetchError) + }) response.body |> json.decode(using: dynamic.list(of: hexpm.decode_package)) diff --git a/apps/backend/src/backend.gleam b/apps/backend/src/backend.gleam index 288fdf0..7c52109 100644 --- a/apps/backend/src/backend.gleam +++ b/apps/backend/src/backend.gleam @@ -12,6 +12,7 @@ import setup import tasks/hex import tasks/popularity import tasks/ranking +import tasks/timeseries import wisp import wisp/logger @@ -27,11 +28,6 @@ pub fn main() { setup.radiate() let assert Ok(subject) = type_search.init(ctx.db) - // let assert Ok(_) = - // supervisor.start(fn(children) { - // use _ <- function.tap(children) - // supervisor.add(children, { supervisor.worker(fn(_) { Ok(subject) }) }) - // }) let ctx = ctx |> config.add_type_search_subject(subject) @@ -42,22 +38,29 @@ pub fn main() { |> mist.port(cnf.port) |> mist.start_http() - let assert Ok(_) = - supervisor.start(fn(periodic_children) { - use _ <- function.tap(periodic_children) - let assert Ok(_) = - supervisor.start(fn(children) { - add_periodic_worker(periodic_children, waiting: 6 * 1000, do: fn() { - hex.sync_new_gleam_releases(ctx, children) - }) - add_periodic_worker(periodic_children, waiting: 86_400_000, do: fn() { - ranking.compute_ranking(ctx) - }) - add_periodic_worker(periodic_children, waiting: 86_400_000, do: fn() { - popularity.compute_popularity(ctx) - }) - }) - }) + let assert Ok(_) = { + use periodic_children <- supervisor.start() + use periodic_children <- function.tap(periodic_children) + let assert Ok(_) = { + use children <- supervisor.start() + // Every 10 seconds + add_periodic_worker(periodic_children, waiting: 10 * 1000, do: fn() { + hex.sync_new_gleam_releases(ctx, children) + }) + // Every day + add_periodic_worker(periodic_children, waiting: 86_400_000, do: fn() { + ranking.compute_ranking(ctx) + }) + // Every day + add_periodic_worker(periodic_children, waiting: 86_400_000, do: fn() { + popularity.compute_popularity(ctx) + }) + // Every hour + add_periodic_worker(periodic_children, waiting: 3600 * 1000, do: fn() { + timeseries.store_timeseries(ctx) + }) + } + } process.sleep_forever() } diff --git a/apps/backend/src/backend/postgres/queries.gleam b/apps/backend/src/backend/postgres/queries.gleam index 6b0257e..79f1a52 100644 --- a/apps/backend/src/backend/postgres/queries.gleam +++ b/apps/backend/src/backend/postgres/queries.gleam @@ -71,6 +71,36 @@ pub fn upsert_search_analytics(db: pgo.Connection, query: String) { }) } +pub fn select_last_day_search_analytics(db: pgo.Connection) { + "SELECT query, occurences + FROM search_analytics + WHERE updated_at <= now() + AND updated_at >= now() - INTERVAL '1 hour'" + |> pgo.execute(db, [], dynamic.tuple2(dynamic.string, dynamic.int)) + |> result.map(fn(r) { r.rows }) + |> result.map_error(error.DatabaseError) +} + +pub fn upsert_search_analytics_timeseries( + db: pgo.Connection, + analytic: #(String, Int), +) { + let #(date, _) = birl.to_erlang_universal_datetime(birl.now()) + let now = birl.from_erlang_universal_datetime(#(date, #(0, 0, 0))) + let timestamp = helpers.convert_time(now) + let #(query, occurences) = analytic + "INSERT INTO analytics_timeseries (query, occurences, date) + VALUES ($1, $2, $3) + ON CONFLICT (query, date) DO UPDATE + SET occurences = $2" + |> pgo.execute( + db, + [pgo.text(query), pgo.int(occurences), timestamp], + dynamic.dynamic, + ) + |> result.map_error(error.DatabaseError) +} + pub fn upsert_hex_user(db: pgo.Connection, owner: hexpm.PackageOwner) { let username = pgo.text(owner.username) let email = pgo.nullable(pgo.text, owner.email) diff --git a/apps/backend/src/tasks/hex.gleam b/apps/backend/src/tasks/hex.gleam index 8241a88..ec6ff24 100644 --- a/apps/backend/src/tasks/hex.gleam +++ b/apps/backend/src/tasks/hex.gleam @@ -82,11 +82,11 @@ fn sync_packages( use all_packages <- result.try(api.get_api_packages_page(page, api_key)) let state = State(..state, newest: first_timestamp(all_packages, state)) let new_packages = take_fresh_packages(all_packages, state.limit) - use state <- result.try(list.try_fold( - new_packages, - state, - do_sync_package(Some(children), force: False), - )) + use state <- result.try({ + list.try_fold(new_packages, state, { + do_sync_package(Some(children), force: False) + }) + }) case list.length(all_packages) == list.length(new_packages) { _ if all_packages == [] -> Ok(state.newest) False -> Ok(state.newest) diff --git a/apps/backend/src/tasks/timeseries.gleam b/apps/backend/src/tasks/timeseries.gleam new file mode 100644 index 0000000..252bed4 --- /dev/null +++ b/apps/backend/src/tasks/timeseries.gleam @@ -0,0 +1,21 @@ +import backend/config.{type Context} +import backend/error +import backend/postgres/queries +import gleam/function +import gleam/list +import gleam/result +import wisp + +pub fn store_timeseries(ctx: Context) { + wisp.log_info("Storing analytics timeseries") + let query = queries.select_last_day_search_analytics(ctx.db) + use analytics <- result.try(query) + use _ <- function.tap({ + result.all({ + use analytic <- list.map(analytics) + queries.upsert_search_analytics_timeseries(ctx.db, analytic) + }) + |> result.map_error(error.debug_log) + }) + wisp.log_info("Storing analytics finished!") +}