Skip to content

Commit

Permalink
Use cache_order!
Browse files Browse the repository at this point in the history
  • Loading branch information
tpendragon committed Sep 18, 2024
1 parent f5e6b44 commit d948690
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 10 deletions.
10 changes: 5 additions & 5 deletions lib/dpul_collections/indexing_pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ defmodule DpulCollections.IndexingPipeline do
def write_hydration_cache_entry(attrs \\ %{}) do
conflict_query =
Figgy.HydrationCacheEntry
|> update(set: [data: ^attrs.data, source_cache_order: ^attrs.source_cache_order])
|> update(set: [data: ^attrs.data, source_cache_order: ^attrs.source_cache_order, cache_order: ^DateTime.utc_now()])
|> where([c], c.source_cache_order <= ^attrs.source_cache_order)

try do
Expand All @@ -79,16 +79,16 @@ defmodule DpulCollections.IndexingPipeline do
count :: integer
) :: list(Figgy.HydrationCacheEntry)
def get_hydration_cache_entries_since!(
%Figgy.HydrationCacheEntryMarker{timestamp: source_cache_order, id: id},
%Figgy.HydrationCacheEntryMarker{timestamp: cache_order, id: id},
count
) do
query =
from r in Figgy.HydrationCacheEntry,
where:
(r.source_cache_order == ^source_cache_order and r.record_id > ^id) or
r.source_cache_order > ^source_cache_order,
(r.cache_order == ^cache_order and r.record_id > ^id) or
r.cache_order > ^cache_order,
limit: ^count,
order_by: [asc: r.source_cache_order, asc: r.record_id]
order_by: [asc: r.cache_order, asc: r.record_id]

Repo.all(query)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationCacheEntryMarker do
def from(nil), do: nil

@spec from(%HydrationCacheEntry{}) :: t()
def from(%HydrationCacheEntry{source_cache_order: timestamp, record_id: id}) do
def from(%HydrationCacheEntry{cache_order: timestamp, record_id: id}) do
%__MODULE__{timestamp: timestamp, id: id}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do
source_cache_order: message.data.updated_at,
data: message.data |> Map.from_struct() |> Map.delete(:__meta__)
})
IO.inspect {internal_resource, response.record_id}
{:ok, response}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationProducer do
|> Map.put(:stored_demand, calculate_stored_demand(total_demand, length(records)))
|> Map.put(:debug_counter, state.debug_counter + length(records))

IO.inspect(new_state.debug_counter)
# Set a timer to try fulfilling demand again later
if new_state.stored_demand > 0 do
Process.send_after(self(), :check_for_updates, 50)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do
Task.await(task, 15000)

task =
Task.async(fn -> wait_for_transformed_id(FiggyTestSupport.last_ephemera_folder_id()) end)
Task.async(fn -> wait_for_transformed_id(FiggyTestSupport.last_hydration_cache_entry_marker().id) end)

Task.await(task, 15000)
Solr.commit()
Expand Down
11 changes: 11 additions & 0 deletions test/dpul_collections/indexing_pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ defmodule DpulCollections.IndexingPipelineTest do
IndexingPipeline.get_hydration_cache_entry!(hydration_cache_entry.id)
end
end

test "write_hydration_cache_entry/1 upserts a cache entry" do
{:ok, record} = IndexingPipeline.write_hydration_cache_entry(%{data: %{}, source_cache_order: ~U[2024-07-23 20:05:00Z], cache_version: 0, record_id: "some record_id"})
{:ok, record_2} = IndexingPipeline.write_hydration_cache_entry(%{data: %{}, source_cache_order: ~U[2024-07-24 20:05:00Z], cache_version: 0, record_id: "some record_id"})
{:ok, record_3} = IndexingPipeline.write_hydration_cache_entry(%{data: %{}, source_cache_order: ~U[2024-07-22 20:05:00Z], cache_version: 0, record_id: "some record_id"})
reloaded = IndexingPipeline.get_hydration_cache_entry!(record_2.id)
assert record.cache_order != reloaded.cache_order
record_3 = IndexingPipeline.get_hydration_cache_entry!(record_2.id)
assert record_3.cache_order == reloaded.cache_order
assert IndexingPipeline.list_hydration_cache_entries() |> length == 1
end
end

describe "processor_markers" do
Expand Down
2 changes: 1 addition & 1 deletion test/support/figgy_test_support.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule FiggyTestSupport do
query =
from r in Figgy.HydrationCacheEntry,
limit: 1,
order_by: [desc: r.source_cache_order, desc: r.id]
order_by: [desc: r.cache_order, desc: r.id]

Repo.all(query) |> hd |> Figgy.HydrationCacheEntryMarker.from()
end
Expand Down

0 comments on commit d948690

Please sign in to comment.