Skip to content

Commit

Permalink
Fix syncing refresh and reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
voodoos committed Dec 1, 2024
1 parent 8982772 commit 49956a3
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 36 deletions.
9 changes: 3 additions & 6 deletions bin/servers.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@ let connect (server_id, { connexion; status; refresh }) =
let _ =
Worker_client.listen Servers_status_update ~f:(fun (id, report) ->
(* TODO: subscribe to a specific server's updates *)
let previous_status = Lwd.peek status in
if String.equal server_id id then (
let previous_status = Lwd.peek status in
Lwd.set status report;
match (previous_status.sync_progress, report.sync_progress) with
| Some { remaining; _ }, Some { remaining = remaining'; _ }
when remaining <> remaining' ->
Lwd.set refresh ()
| Some _, None -> Lwd.set refresh ()
match (previous_status.status, report.status) with
| Syncing, In_sync -> Lwd.set refresh ()
| _ -> ()))
in
ignore (Worker_client.query @@ Add_servers [ (server_id, connexion) ])
Expand Down
24 changes: 7 additions & 17 deletions lib/data_source/jellyfin_api.ml
Original file line number Diff line number Diff line change
Expand Up @@ -389,23 +389,13 @@ let request (type pp p r) ~base_url ?token ?headers
let* res = request @@ Request.v ~init url in
let+ json = Response.as_body res |> Body.text in

(* We split the answer that can be very large because Jstr.to_string might stack overflow in mlBytes.js at:
//Provides: jsoo_is_ascii
function jsoo_is_ascii (s) {
// The regular expression gets better at around this point for all browsers
if (s.length < 24) {
// Spidermonkey gets much slower when s.length >= 24 (on 64 bit archs)
for (var i = 0; i < s.length; i++) if (s.charCodeAt(i) > 127) return false;
return true;
} else
return !/[^\x00-\x7f]/.test(s);
} *)
(* TODO: one way to improve performance might be to use a direct Jstr.t ->
Jv.t -> Yojson.Safe.t flow so that only strings in field would be converted
to caml strings, not the whole request. *)
let cuts = Jstr.cuts ~sep:(Jstr.v "{") json in
let json = List.map cuts ~f:Jstr.to_string |> String.concat ~sep:"{" in
let yojson = J.from_string json in
(* Ezjson uses jsonm that is able to detect the encoding of the input string
(here an UTF_16 javascript string). This removes the need from converting
it to an ocaml string which could lead to "too much recursion" errors in
some cases. *)
(* TODO Ezjson.value_from_src_result is still the main cause of slowdown
during the sychronization process. *)
let yojson = J.from_string (Jstr.binary_to_octets json) in
try Q.response_of_yojson yojson
with e ->
Console.log [ "An error occured while decoding response: "; json ];
Expand Down
32 changes: 19 additions & 13 deletions lib/db/sync.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ open Source.Api

type status =
| Unknown
| Syncing
| In_sync
| Inconsistent
| New_items of {
Expand All @@ -53,6 +54,7 @@ let initial_report = { status = Unknown; sync_progress = None }

let status_to_string = function
| Unknown -> "Unknown"
| Syncing -> "Syncing"
| In_sync -> "Synchronized"
| Inconsistent -> "Inconsistent"
| New_items { first_missing_key; first_unfetched_key; last_source_item_key }
Expand All @@ -74,6 +76,7 @@ let pp_report fmt { status; sync_progress } =

let log_status = function
| Unknown -> Console.info [ "Database status is unknown" ]
| Syncing -> Console.info [ "Databae is being synchronized" ]
| In_sync -> Console.info [ "Database is synchronized" ]
| Inconsistent -> Console.warn [ "Database is out-of-sync" ]
| New_items { first_missing_key; first_unfetched_key; last_source_item_key }
Expand Down Expand Up @@ -440,7 +443,6 @@ let sync_v2 ~report ~(source : Source.connexion) idb =
Console.info [ "Syncing database" ];
let* views = update_collections source idb in
let queue = Queue.create () in
let max_queue_length = ref 0 in
let workers : int Fut.t Queue.t = Queue.create () in
let* _ =
List.map views ~f:(fun (collection_id, view) ->
Expand All @@ -462,11 +464,16 @@ let sync_v2 ~report ~(source : Source.connexion) idb =
Queue.add (collection_id, view) queue)
|> Fut.of_list |> Fut.map Result.flatten_l
in
let max_queue_length = ref (Queue.length queue) in
let add_to_queue v =
max_queue_length := !max_queue_length + 1;
Queue.add v queue
in
let running_jobs = ref 0 in
let run_job ~worker ~worker_is_ready (collection_id, folder) =
incr running_jobs;
let+ children = sync_folder ~source ~collection_id ~folder idb in
List.iter children ~f:(Fun.flip Queue.add queue);
List.iter children ~f:add_to_queue;
decr running_jobs;
worker_is_ready worker
in
Expand All @@ -484,16 +491,6 @@ let sync_v2 ~report ~(source : Source.connexion) idb =
let _ = G.set_timeout ~ms:125 (fun () -> timeout (Ok ())) in
timer
in
max_queue_length := max !max_queue_length (Queue.length queue);
let () =
report
@@ Some
{
total = !max_queue_length;
remaining = renaming ();
jobs = !running_jobs;
}
in
(* Wait for a worker *)
let next_worker = Queue.take_opt workers in
match next_worker with
Expand All @@ -507,6 +504,15 @@ let sync_v2 ~report ~(source : Source.connexion) idb =
worker_is_ready worker;
Fut.ok ()
| Some job ->
let () =
report
@@ Some
{
total = !max_queue_length;
remaining = renaming ();
jobs = !running_jobs;
}
in
let worker =
let* () = run_job ~worker ~worker_is_ready job in
(assign_work [@tailcall]) ()
Expand All @@ -524,6 +530,6 @@ let check_and_sync ?(report = fun _ -> ()) ~source idb =
let open Fut.Result_syntax in
let initial = initial_report in
let () = (* Send a first report *) report initial in
let report' sync_progress = report { initial with sync_progress } in
let report' sync_progress = report { status = Syncing; sync_progress } in
let+ () = sync_v2 ~report:report' ~source idb in
report { status = In_sync; sync_progress = None }
1 change: 1 addition & 0 deletions lib/db/sync.mli
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

type status =
| Unknown
| Syncing
| In_sync
| Inconsistent
| New_items of {
Expand Down
8 changes: 8 additions & 0 deletions lib/std/brr_utils.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
open Brr

let with_timing name f =
let open Brr.Performance in
let before = now_ms G.performance in
let result = f () in
Console.log [ name; " took"; now_ms G.performance -. before; "ms" ];
result
1 change: 1 addition & 0 deletions lib/std/std.ml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
include ContainersLabels
module Brr_utils = Brr_utils

module Int = struct
include Int
Expand Down

0 comments on commit 49956a3

Please sign in to comment.