Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poll for updates #90

Merged
merged 25 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0eacea5
Add polling to Transformation.
tpendragon Sep 17, 2024
83e661c
Demonstrate race condition.
tpendragon Sep 17, 2024
043cf8c
Moved writing to hydration cache into the batch function
sdellis Sep 18, 2024
4b9d8b1
it's broken though
hackartisan Sep 18, 2024
3c19ff8
Debugging
eliotjordan Sep 18, 2024
4d3e054
Use cache_order!
tpendragon Sep 18, 2024
4b34973
Fix Hydrator/Transformer Test
tpendragon Sep 19, 2024
d0208be
Fix other tests.
tpendragon Sep 19, 2024
e7bb413
Get everything passing.
tpendragon Sep 19, 2024
46d38e7
Don't need Solr.commit
tpendragon Sep 19, 2024
751cc60
Format.
tpendragon Sep 19, 2024
c2862fa
Fix unit tests.
tpendragon Sep 19, 2024
3b48612
Improve coverage some.
tpendragon Sep 19, 2024
239423c
Fixed bug with handle_info
sdellis Sep 19, 2024
df86501
Make transformation producer stop more gracefully
hackartisan Sep 19, 2024
849f5e3
Add trap_exit to indexing producer.
tpendragon Sep 19, 2024
1ddef92
Start work on indexing integration tests
eliotjordan Sep 19, 2024
ff44788
Added find_by_id Solr helper and updated test for indexing integration
sdellis Sep 19, 2024
93c9682
Finish indexing integration test
hackartisan Sep 19, 2024
0daa364
Remove unneeded exit handling
hackartisan Sep 19, 2024
dcb3457
Finish coverage.
tpendragon Sep 19, 2024
926ae1f
Move hydration cache writes out of batcher back to consumers
hackartisan Sep 20, 2024
66c82ab
Merge pull request #92 from pulibrary/dont-batch
tpendragon Sep 20, 2024
3c01c9f
Deal with some compiler complaints
hackartisan Sep 20, 2024
be896c3
Clarified hydration upsert test
sdellis Sep 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions lib/dpul_collections/indexing_pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ defmodule DpulCollections.IndexingPipeline do
def write_hydration_cache_entry(attrs \\ %{}) do
conflict_query =
Figgy.HydrationCacheEntry
|> update(set: [data: ^attrs.data, source_cache_order: ^attrs.source_cache_order])
|> update(
set: [
data: ^attrs.data,
source_cache_order: ^attrs.source_cache_order,
cache_order: ^DateTime.utc_now()
]
)
|> where([c], c.source_cache_order <= ^attrs.source_cache_order)

try do
Expand All @@ -79,16 +85,16 @@ defmodule DpulCollections.IndexingPipeline do
count :: integer
) :: list(Figgy.HydrationCacheEntry)
def get_hydration_cache_entries_since!(
%Figgy.HydrationCacheEntryMarker{timestamp: source_cache_order, id: id},
%Figgy.HydrationCacheEntryMarker{timestamp: cache_order, id: id},
count
) do
query =
from r in Figgy.HydrationCacheEntry,
where:
(r.source_cache_order == ^source_cache_order and r.record_id > ^id) or
r.source_cache_order > ^source_cache_order,
(r.cache_order == ^cache_order and r.record_id > ^id) or
r.cache_order > ^cache_order,
limit: ^count,
order_by: [asc: r.source_cache_order, asc: r.record_id]
order_by: [asc: r.cache_order, asc: r.record_id]

Repo.all(query)
end
Expand Down Expand Up @@ -302,16 +308,16 @@ defmodule DpulCollections.IndexingPipeline do
count :: integer
) :: list(Figgy.TransformationCacheEntry)
def get_transformation_cache_entries_since!(
%Figgy.TransformationCacheEntryMarker{timestamp: source_cache_order, id: id},
%Figgy.TransformationCacheEntryMarker{timestamp: cache_order, id: id},
count
) do
query =
from r in Figgy.TransformationCacheEntry,
where:
(r.source_cache_order == ^source_cache_order and r.record_id > ^id) or
r.source_cache_order > ^source_cache_order,
(r.cache_order == ^cache_order and r.record_id > ^id) or
r.cache_order > ^cache_order,
limit: ^count,
order_by: [asc: r.source_cache_order, asc: r.record_id]
order_by: [asc: r.cache_order, asc: r.record_id]

Repo.all(query)
end
Expand All @@ -324,7 +330,7 @@ defmodule DpulCollections.IndexingPipeline do
query =
from r in Figgy.TransformationCacheEntry,
limit: ^count,
order_by: [asc: r.source_cache_order, asc: r.record_id]
order_by: [asc: r.cache_order, asc: r.record_id]

Repo.all(query)
end
Expand Down Expand Up @@ -353,7 +359,13 @@ defmodule DpulCollections.IndexingPipeline do
def write_transformation_cache_entry(attrs \\ %{}) do
conflict_query =
Figgy.TransformationCacheEntry
|> update(set: [data: ^attrs.data, source_cache_order: ^attrs.source_cache_order])
|> update(
set: [
data: ^attrs.data,
source_cache_order: ^attrs.source_cache_order,
cache_order: ^DateTime.utc_now()
]
)
|> where([c], c.source_cache_order <= ^attrs.source_cache_order)

try do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationCacheEntryMarker do
def from(nil), do: nil

@spec from(%HydrationCacheEntry{}) :: t()
def from(%HydrationCacheEntry{source_cache_order: timestamp, record_id: id}) do
def from(%HydrationCacheEntry{cache_order: timestamp, record_id: id}) do
%__MODULE__{timestamp: timestamp, id: id}
end

Expand Down
44 changes: 38 additions & 6 deletions lib/dpul_collections/indexing_pipeline/figgy/indexing_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,24 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducer do
last_queried_marker: Figgy.TransformationCacheEntryMarker.t(),
pulled_records: [Figgy.TransformationCacheEntryMarker.t()],
acked_records: [Figgy.TransformationCacheEntryMarker.t()],
cache_version: Integer
cache_version: Integer,
stored_demand: Integer
}
@spec init(integer()) :: {:producer, state()}
def init(cache_version) do
# trap the exit so we can stop gracefully
# see https://www.erlang.org/doc/apps/erts/erlang.html#process_flag/2
Process.flag(:trap_exit, true)

last_queried_marker =
IndexingPipeline.get_processor_marker!("figgy_indexer", cache_version)

initial_state = %{
last_queried_marker: last_queried_marker |> Figgy.TransformationCacheEntryMarker.from(),
pulled_records: [],
acked_records: [],
cache_version: cache_version
cache_version: cache_version,
stored_demand: 0
}

{:producer, initial_state}
Expand All @@ -41,12 +47,14 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducer do
state = %{
last_queried_marker: last_queried_marker,
pulled_records: pulled_records,
acked_records: acked_records
acked_records: acked_records,
stored_demand: stored_demand
}
)
when demand > 0 do
) do
total_demand = stored_demand + demand

records =
IndexingPipeline.get_transformation_cache_entries_since!(last_queried_marker, demand)
IndexingPipeline.get_transformation_cache_entries_since!(last_queried_marker, total_demand)

new_state =
state
Expand All @@ -62,10 +70,34 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducer do
)
)
|> Map.put(:acked_records, acked_records)
|> Map.put(:stored_demand, calculate_stored_demand(total_demand, length(records)))

# Set a timer to try fulfilling demand again later
if new_state.stored_demand > 0 do
Process.send_after(self(), :check_for_updates, 50)
end

{:noreply, Enum.map(records, &wrap_record/1), new_state}
end

defp calculate_stored_demand(total_demand, fulfilled_demand)
when total_demand == fulfilled_demand do
0
end

defp calculate_stored_demand(total_demand, fulfilled_demand)
when total_demand > fulfilled_demand do
total_demand - fulfilled_demand
end

def handle_info(:check_for_updates, state = %{stored_demand: demand}) when demand > 0 do
handle_demand(0, state)
end

def handle_info(:check_for_updates, state) do
{:noreply, [], state}
end

@impl GenStage
@spec handle_info({atom(), atom(), list(%Figgy.TransformationCacheEntryMarker{})}, state()) ::
{:noreply, list(Broadway.Message.t()), state()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationCacheEntryMarker
def from(nil), do: nil

@spec from(%TransformationCacheEntry{}) :: t()
def from(%TransformationCacheEntry{source_cache_order: timestamp, record_id: id}) do
def from(%TransformationCacheEntry{cache_order: timestamp, record_id: id}) do
%__MODULE__{timestamp: timestamp, id: id}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationConsumer do
IndexingPipeline.write_transformation_cache_entry(%{
cache_version: cache_version,
record_id: hydration_cache_entry |> Map.get(:record_id),
source_cache_order: hydration_cache_entry |> Map.get(:source_cache_order),
source_cache_order: hydration_cache_entry |> Map.get(:cache_order),
data: solr_doc
})
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,24 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationProducer do
last_queried_marker: Figgy.HydrationCacheEntryMarker.t(),
pulled_records: [Figgy.HydrationCacheEntryMarker.t()],
acked_records: [Figgy.HydrationCacheEntryMarker.t()],
cache_version: Integer
cache_version: Integer,
stored_demand: Integer
}
@spec init(integer()) :: {:producer, state()}
def init(cache_version) do
# trap the exit so we can stop gracefully
# see https://www.erlang.org/doc/apps/erts/erlang.html#process_flag/2
Process.flag(:trap_exit, true)

last_queried_marker =
IndexingPipeline.get_processor_marker!("figgy_transformer", cache_version)

initial_state = %{
last_queried_marker: last_queried_marker |> Figgy.HydrationCacheEntryMarker.from(),
pulled_records: [],
acked_records: [],
cache_version: cache_version
cache_version: cache_version,
stored_demand: 0
}

{:producer, initial_state}
Expand All @@ -42,11 +48,14 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationProducer do
state = %{
last_queried_marker: last_queried_marker,
pulled_records: pulled_records,
acked_records: acked_records
acked_records: acked_records,
stored_demand: stored_demand
}
)
when demand > 0 do
records = IndexingPipeline.get_hydration_cache_entries_since!(last_queried_marker, demand)
) do
total_demand = stored_demand + demand

records =
IndexingPipeline.get_hydration_cache_entries_since!(last_queried_marker, total_demand)

new_state =
state
Expand All @@ -59,10 +68,36 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationProducer do
Enum.concat(pulled_records, Enum.map(records, &Figgy.HydrationCacheEntryMarker.from/1))
)
|> Map.put(:acked_records, acked_records)
|> Map.put(:stored_demand, calculate_stored_demand(total_demand, length(records)))

# Set a timer to try fulfilling demand again later
if new_state.stored_demand > 0 do
Process.send_after(self(), :check_for_updates, 50)
end

{:noreply, Enum.map(records, &wrap_record/1), new_state}
end

defp calculate_stored_demand(total_demand, fulfilled_demand)
when total_demand == fulfilled_demand do
0
end

defp calculate_stored_demand(total_demand, fulfilled_demand)
when total_demand > fulfilled_demand do
total_demand - fulfilled_demand
end

def handle_info(:check_for_updates, state = %{stored_demand: demand})
when demand > 0 do
new_demand = 0
handle_demand(new_demand, state)
end

def handle_info(:check_for_updates, state) do
{:noreply, [], state}
end

@impl GenStage
@spec handle_info({atom(), atom(), list(%Figgy.HydrationCacheEntryMarker{})}, state()) ::
{:noreply, list(Broadway.Message.t()), state()}
Expand Down
13 changes: 13 additions & 0 deletions lib/dpul_collections/solr.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,19 @@ defmodule DpulCollections.Solr do
response.body["response"]["numFound"]
end

def find_by_id(id) do
{:ok, response} =
Req.get(
select_url(),
params: [q: "id:#{id}"]
)

case response.body["response"]["docs"] do
[] -> nil
[doc] -> doc
end
end

@spec add(list(map())) :: {:ok, Req.Response.t()} | {:error, Exception.t()}
def add(docs) do
Req.post(
Expand Down
Loading
Loading