Skip to content

Commit

Permalink
Working sync. Still slowed down by string conversion...
Browse files Browse the repository at this point in the history
  • Loading branch information
voodoos committed Dec 1, 2024
1 parent e350dd5 commit c9eabad
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 30 deletions.
12 changes: 4 additions & 8 deletions bin/servers.ml
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ let ui_status server =
Lwd.map (Lwd.get server.status) ~f:(fun { status; sync_progress } ->
match (status, sync_progress) with
| In_sync, None -> El.txt' "Synchronized"
| _, Some { Db.Sync.total; remaining } ->
| _, Some { Db.Sync.total; remaining; jobs } ->
El.txt'
@@ Printf.sprintf "Sync in progress: %i/%i" (total - remaining)
total
@@ Printf.sprintf "Sync in progress: %i/%i (%i jobs)"
(total - remaining) total jobs
| _ -> El.txt' "Desynchronized")
in
status
Expand Down Expand Up @@ -134,17 +134,13 @@ let servers_libraries =
in
Lwd_seq.map
(fun (server_id, { refresh; _ }) ->
Console.log [ "NEW REF" ];
let previous_value = ref None in
let v =
Lwd.bind (Lwd.get refresh) ~f:(fun () ->
(* TODO: we should not do that here but in the ui *)
Worker_client.query (Get_libraries ())
|> Fut.map (Result.get_or ~default:[||])
|> Fut.map (fun l ->
Console.log [ "GOT L="; l ];
Array.to_list l)
|> Fut.map Lwd_seq.of_list
|> Fut.map Array.to_list |> Fut.map Lwd_seq.of_list
(* FIXME: This is bad: we create a lwd var each time we refresh
and thiq var had an empty seq value. This caused flickering
when syncing. Having the correct initial value is not a much
Expand Down
56 changes: 35 additions & 21 deletions lib/db/sync.ml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type status =
}
| Partial_fetch of { first_unfetched_key : int; last_source_item_key : int }

type progress = { total : int; remaining : int }
type progress = { total : int; remaining : int; jobs : int }
type report = { status : status; sync_progress : progress option }

let initial_report = { status = Unknown; sync_progress = None }
Expand All @@ -90,8 +90,8 @@ let status_to_string = function
Format.sprintf "Partial: last: %i unfetched: %i" first_unfetched_key
first_unfetched_key

let pp_progress fmt { total; remaining } =
Format.fprintf fmt "(%i/%i)" remaining total
let pp_progress fmt { total; remaining; jobs } =
Format.fprintf fmt "(%i/%i) [%i jobs]" remaining total jobs

let pp_report fmt { status; sync_progress } =
let status = status_to_string status in
Expand Down Expand Up @@ -490,6 +490,12 @@ let sync_folder ~source ~collection_id ~(folder : Item.t) idb =
enable_images = true;
}
in
(* Throttle queries *)
let* () =
let timer, timeout = Fut.create () in
let _ = G.set_timeout ~ms:50 (fun () -> timeout (Ok ())) in
timer
in
let* { Api.Items.start_index = _; items; _ } =
query source (module Api.Items) req ()
in
Expand Down Expand Up @@ -552,7 +558,7 @@ let sync_v2 ~report ~(source : Source.connexion) idb =
let* _ =
List.map views ~f:(fun (collection_id, view) ->
let* src_track_count = get_source_track_count source view in
let+ db_track_count = get_db_track_count idb ~collection_id in
let+ db_track_count = get_db_track_count idb ~collection_id in
let () =
Console.log
[
Expand All @@ -565,10 +571,8 @@ let sync_v2 ~report ~(source : Source.connexion) idb =
" in db)";
]
in
if
String.equal view.Item.name "Musique"
&& src_track_count > db_track_count
then Queue.add (collection_id, view) queue)
if src_track_count > db_track_count then
Queue.add (collection_id, view) queue)
|> Fut.of_list |> Fut.map Result.flatten_l
in
let running_jobs = ref 0 in
Expand All @@ -589,24 +593,34 @@ let sync_v2 ~report ~(source : Source.connexion) idb =
let rec assign_work () =
max_queue_length := max !max_queue_length (Queue.length queue);
let () =
report @@ Some { total = !max_queue_length; remaining = renaming () }
report
@@ Some
{
total = !max_queue_length;
remaining = renaming ();
jobs = !running_jobs;
}
in
match Queue.take_opt queue with
(* Wait for a worker *)
let next_worker = Queue.take_opt workers in
match next_worker with
| None -> Fut.ok ()
| Some job -> (
(* Wait for a worker *)
let next_worker = Queue.take_opt workers in
match next_worker with
| None -> Fut.ok ()
| Some next_worker ->
Fut.bind next_worker @@ fun worker ->
let future_worker, worker_is_ready = Fut.create () in
let () = Queue.add future_worker workers in
let* () =
| Some next_worker -> (
let future_worker, worker_is_ready = Fut.create () in
let () = Queue.add future_worker workers in
Fut.bind next_worker @@ fun worker ->
match Queue.take_opt queue with
| None ->
worker_is_ready worker;
Fut.ok ()
| Some job ->
let worker =
let* () = run_job ~worker ~worker_is_ready job in
(assign_work [@tailcall]) ()
in
(assign_work [@tailcall]) ())
let* () = assign_work () in
let+ () = worker in
())
in
assign_work ()
in
Expand Down
2 changes: 1 addition & 1 deletion lib/db/sync.mli
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type status =

val log_status : status -> unit

type progress = { total : int; remaining : int }
type progress = { total : int; remaining : int; jobs : int }
type report = { status : status; sync_progress : progress option }

val initial_report : report
Expand Down

0 comments on commit c9eabad

Please sign in to comment.