Skip to content

Commit

Permalink
feat: add timeseries computation
Browse files Browse the repository at this point in the history
  • Loading branch information
ghivert committed Sep 2, 2024
1 parent ec5ce83 commit 28e3c75
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- migrate:up
create table analytics_timeseries (
query text not null,
occurences int not null,
date timestamptz not null,
primary key (query, date)
);

-- migrate:down
drop table analytics_timeseries;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- migrate:up
alter table only search_analytics alter column occurences set default 1;
update search_analytics set occurences = occurences + 1;

-- migrate:down
alter table only search_analytics alter column occurences set default 0;
update search_analytics set occurences = occurences - 1;
25 changes: 23 additions & 2 deletions apps/backend/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ ALTER TABLE public.analytics ALTER COLUMN id ADD GENERATED ALWAYS AS IDENTITY (
);


--
-- Name: analytics_timeseries; Type: TABLE; Schema: public; Owner: -
--

CREATE TABLE public.analytics_timeseries (
query text NOT NULL,
occurences integer NOT NULL,
date timestamp with time zone NOT NULL
);


--
-- Name: hex_read; Type: TABLE; Schema: public; Owner: -
--
Expand Down Expand Up @@ -304,7 +315,7 @@ CREATE TABLE public.schema_migrations (

CREATE TABLE public.search_analytics (
query text NOT NULL,
occurences integer DEFAULT 0 NOT NULL,
occurences integer DEFAULT 1 NOT NULL,
created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL
);
Expand All @@ -326,6 +337,14 @@ ALTER TABLE ONLY public.analytics
ADD CONSTRAINT analytics_pkey PRIMARY KEY (id);


--
-- Name: analytics_timeseries analytics_timeseries_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--

ALTER TABLE ONLY public.analytics_timeseries
ADD CONSTRAINT analytics_timeseries_pkey PRIMARY KEY (query, date);


--
-- Name: hex_read hex_read_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
Expand Down Expand Up @@ -590,4 +609,6 @@ INSERT INTO public.schema_migrations (version) VALUES
('20240521204341'),
('20240801164720'),
('20240801211520'),
('20240801220817');
('20240801220817'),
('20240902224247'),
('20240902225236');
2 changes: 1 addition & 1 deletion apps/backend/src/api/github.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ fn query(
use response <- result.try(
request.new()
|> request.set_header("authorization", "Bearer " <> token)
|> request.set_header("user-agent", "gloogle / 0.0.0")
|> request.set_header("user-agent", "gloogle / 1.0.0")
|> request.set_method(http.Post)
|> request.set_scheme(http.Https)
|> request.set_host("api.github.com")
Expand Down
6 changes: 3 additions & 3 deletions apps/backend/src/api/github/stargazer_count.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub fn decoder(dyn) {
}

pub fn variables(name: String, owner: String) {
Some(
json.object([#("name", json.string(name)), #("owner", json.string(owner))]),
)
let name = json.string(name)
let owner = json.string(owner)
Some(json.object([#("name", name), #("owner", owner)]))
}
38 changes: 18 additions & 20 deletions apps/backend/src/api/hex.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,31 @@ import gleam/result
import gleam/uri

pub fn get_package_owners(package_name: String, secret hex_api_key: String) {
use response <- result.try(
use response <- result.try({
request.new()
|> request.set_host("hex.pm")
|> request.set_path("/api/packages/" <> package_name <> "/owners")
|> request.prepend_header("authorization", hex_api_key)
|> request.prepend_header("user-agent", "gloogle / 0.0.0")
|> request.prepend_header("user-agent", "gloogle / 1.0.0")
|> httpc.send()
|> result.map_error(error.FetchError),
)
|> result.map_error(error.FetchError)
})

response.body
|> json.decode(using: dynamic.list(decode_hex_owner))
|> result.map_error(error.JsonError)
}

pub fn get_package(package_name: String, secret hex_api_key: String) {
use response <- result.try(
use response <- result.try({
request.new()
|> request.set_host("hex.pm")
|> request.set_path("/api/packages/" <> package_name)
|> request.prepend_header("authorization", hex_api_key)
|> request.prepend_header("user-agent", "gloogle / 0.0.0")
|> request.prepend_header("user-agent", "gloogle / 1.0.0")
|> httpc.send()
|> result.map_error(error.FetchError),
)
|> result.map_error(error.FetchError)
})

response.body
|> json.decode(using: hexpm.decode_package)
Expand All @@ -52,35 +52,33 @@ fn decode_hex_owner(data) {
pub fn lookup_release(release: hexpm.PackageRelease, secret hex_api_key: String) {
let assert Ok(url) = uri.parse(release.url)

use response <- result.try(
use response <- result.try({
request.new()
|> request.set_host("hex.pm")
|> request.set_path(url.path)
|> request.prepend_header("authorization", hex_api_key)
|> request.prepend_header("user-agent", "gloogle / 0.0.0")
|> request.prepend_header("user-agent", "gloogle / 1.0.0")
|> httpc.send()
|> result.map_error(error.FetchError),
)
|> result.map_error(error.FetchError)
})

response.body
|> json.decode(using: hexpm.decode_release)
|> result.map_error(error.JsonError)
}

pub fn get_api_packages_page(page: Int, hex_api_key: String) {
use response <- result.try(
let page = int.to_string(page)
use response <- result.try({
request.new()
|> request.set_host("hex.pm")
|> request.set_path("/api/packages")
|> request.set_query([
#("sort", "updated_at"),
#("page", int.to_string(page)),
])
|> request.set_query([#("sort", "updated_at"), #("page", page)])
|> request.prepend_header("authorization", hex_api_key)
|> request.prepend_header("user-agent", "gloogle / 0.0.0")
|> request.prepend_header("user-agent", "gloogle / 1.0.0")
|> httpc.send()
|> result.map_error(error.FetchError),
)
|> result.map_error(error.FetchError)
})

response.body
|> json.decode(using: dynamic.list(of: hexpm.decode_package))
Expand Down
45 changes: 24 additions & 21 deletions apps/backend/src/backend.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import setup
import tasks/hex
import tasks/popularity
import tasks/ranking
import tasks/timeseries
import wisp
import wisp/logger

Expand All @@ -27,11 +28,6 @@ pub fn main() {
setup.radiate()

let assert Ok(subject) = type_search.init(ctx.db)
// let assert Ok(_) =
// supervisor.start(fn(children) {
// use _ <- function.tap(children)
// supervisor.add(children, { supervisor.worker(fn(_) { Ok(subject) }) })
// })

let ctx = ctx |> config.add_type_search_subject(subject)

Expand All @@ -42,22 +38,29 @@ pub fn main() {
|> mist.port(cnf.port)
|> mist.start_http()

let assert Ok(_) =
supervisor.start(fn(periodic_children) {
use _ <- function.tap(periodic_children)
let assert Ok(_) =
supervisor.start(fn(children) {
add_periodic_worker(periodic_children, waiting: 6 * 1000, do: fn() {
hex.sync_new_gleam_releases(ctx, children)
})
add_periodic_worker(periodic_children, waiting: 86_400_000, do: fn() {
ranking.compute_ranking(ctx)
})
add_periodic_worker(periodic_children, waiting: 86_400_000, do: fn() {
popularity.compute_popularity(ctx)
})
})
})
let assert Ok(_) = {
use periodic_children <- supervisor.start()
use periodic_children <- function.tap(periodic_children)
let assert Ok(_) = {
use children <- supervisor.start()
// Every 10 seconds
add_periodic_worker(periodic_children, waiting: 10 * 1000, do: fn() {
hex.sync_new_gleam_releases(ctx, children)
})
// Every day
add_periodic_worker(periodic_children, waiting: 86_400_000, do: fn() {
ranking.compute_ranking(ctx)
})
// Every day
add_periodic_worker(periodic_children, waiting: 86_400_000, do: fn() {
popularity.compute_popularity(ctx)
})
// Every hour
add_periodic_worker(periodic_children, waiting: 3600 * 1000, do: fn() {
timeseries.store_timeseries(ctx)
})
}
}

process.sleep_forever()
}
Expand Down
30 changes: 30 additions & 0 deletions apps/backend/src/backend/postgres/queries.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,36 @@ pub fn upsert_search_analytics(db: pgo.Connection, query: String) {
})
}

pub fn select_last_day_search_analytics(db: pgo.Connection) {
"SELECT query, occurences
FROM search_analytics
WHERE updated_at <= now()
AND updated_at >= now() - INTERVAL '1 hour'"
|> pgo.execute(db, [], dynamic.tuple2(dynamic.string, dynamic.int))
|> result.map(fn(r) { r.rows })
|> result.map_error(error.DatabaseError)
}

pub fn upsert_search_analytics_timeseries(
db: pgo.Connection,
analytic: #(String, Int),
) {
let #(date, _) = birl.to_erlang_universal_datetime(birl.now())
let now = birl.from_erlang_universal_datetime(#(date, #(0, 0, 0)))
let timestamp = helpers.convert_time(now)
let #(query, occurences) = analytic
"INSERT INTO analytics_timeseries (query, occurences, date)
VALUES ($1, $2, $3)
ON CONFLICT (query, date) DO UPDATE
SET occurences = $2"
|> pgo.execute(
db,
[pgo.text(query), pgo.int(occurences), timestamp],
dynamic.dynamic,
)
|> result.map_error(error.DatabaseError)
}

pub fn upsert_hex_user(db: pgo.Connection, owner: hexpm.PackageOwner) {
let username = pgo.text(owner.username)
let email = pgo.nullable(pgo.text, owner.email)
Expand Down
10 changes: 5 additions & 5 deletions apps/backend/src/tasks/hex.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ fn sync_packages(
use all_packages <- result.try(api.get_api_packages_page(page, api_key))
let state = State(..state, newest: first_timestamp(all_packages, state))
let new_packages = take_fresh_packages(all_packages, state.limit)
use state <- result.try(list.try_fold(
new_packages,
state,
do_sync_package(Some(children), force: False),
))
use state <- result.try({
list.try_fold(new_packages, state, {
do_sync_package(Some(children), force: False)
})
})
case list.length(all_packages) == list.length(new_packages) {
_ if all_packages == [] -> Ok(state.newest)
False -> Ok(state.newest)
Expand Down
21 changes: 21 additions & 0 deletions apps/backend/src/tasks/timeseries.gleam
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import backend/config.{type Context}
import backend/error
import backend/postgres/queries
import gleam/function
import gleam/list
import gleam/result
import wisp

pub fn store_timeseries(ctx: Context) {
wisp.log_info("Storing analytics timeseries")
let query = queries.select_last_day_search_analytics(ctx.db)
use analytics <- result.try(query)
use _ <- function.tap({
result.all({
use analytic <- list.map(analytics)
queries.upsert_search_analytics_timeseries(ctx.db, analytic)
})
|> result.map_error(error.debug_log)
})
wisp.log_info("Storing analytics finished!")
}

0 comments on commit 28e3c75

Please sign in to comment.