Skip to content

Commit

Permalink
Merge pull request #90 from pulibrary/45-indexing-timer
Browse files Browse the repository at this point in the history
Poll for updates
  • Loading branch information
tpendragon authored Sep 24, 2024
2 parents 95ef028 + be896c3 commit 6ea969d
Show file tree
Hide file tree
Showing 16 changed files with 366 additions and 244 deletions.
34 changes: 23 additions & 11 deletions lib/dpul_collections/indexing_pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ defmodule DpulCollections.IndexingPipeline do
def write_hydration_cache_entry(attrs \\ %{}) do
conflict_query =
Figgy.HydrationCacheEntry
|> update(set: [data: ^attrs.data, source_cache_order: ^attrs.source_cache_order])
|> update(
set: [
data: ^attrs.data,
source_cache_order: ^attrs.source_cache_order,
cache_order: ^DateTime.utc_now()
]
)
|> where([c], c.source_cache_order <= ^attrs.source_cache_order)

try do
Expand All @@ -79,16 +85,16 @@ defmodule DpulCollections.IndexingPipeline do
count :: integer
) :: list(Figgy.HydrationCacheEntry)
def get_hydration_cache_entries_since!(
%Figgy.HydrationCacheEntryMarker{timestamp: source_cache_order, id: id},
%Figgy.HydrationCacheEntryMarker{timestamp: cache_order, id: id},
count
) do
query =
from r in Figgy.HydrationCacheEntry,
where:
(r.source_cache_order == ^source_cache_order and r.record_id > ^id) or
r.source_cache_order > ^source_cache_order,
(r.cache_order == ^cache_order and r.record_id > ^id) or
r.cache_order > ^cache_order,
limit: ^count,
order_by: [asc: r.source_cache_order, asc: r.record_id]
order_by: [asc: r.cache_order, asc: r.record_id]

Repo.all(query)
end
Expand Down Expand Up @@ -302,16 +308,16 @@ defmodule DpulCollections.IndexingPipeline do
count :: integer
) :: list(Figgy.TransformationCacheEntry)
def get_transformation_cache_entries_since!(
%Figgy.TransformationCacheEntryMarker{timestamp: source_cache_order, id: id},
%Figgy.TransformationCacheEntryMarker{timestamp: cache_order, id: id},
count
) do
query =
from r in Figgy.TransformationCacheEntry,
where:
(r.source_cache_order == ^source_cache_order and r.record_id > ^id) or
r.source_cache_order > ^source_cache_order,
(r.cache_order == ^cache_order and r.record_id > ^id) or
r.cache_order > ^cache_order,
limit: ^count,
order_by: [asc: r.source_cache_order, asc: r.record_id]
order_by: [asc: r.cache_order, asc: r.record_id]

Repo.all(query)
end
Expand All @@ -324,7 +330,7 @@ defmodule DpulCollections.IndexingPipeline do
query =
from r in Figgy.TransformationCacheEntry,
limit: ^count,
order_by: [asc: r.source_cache_order, asc: r.record_id]
order_by: [asc: r.cache_order, asc: r.record_id]

Repo.all(query)
end
Expand Down Expand Up @@ -353,7 +359,13 @@ defmodule DpulCollections.IndexingPipeline do
def write_transformation_cache_entry(attrs \\ %{}) do
conflict_query =
Figgy.TransformationCacheEntry
|> update(set: [data: ^attrs.data, source_cache_order: ^attrs.source_cache_order])
|> update(
set: [
data: ^attrs.data,
source_cache_order: ^attrs.source_cache_order,
cache_order: ^DateTime.utc_now()
]
)
|> where([c], c.source_cache_order <= ^attrs.source_cache_order)

try do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationCacheEntryMarker do
def from(nil), do: nil

@spec from(%HydrationCacheEntry{}) :: t()
def from(%HydrationCacheEntry{source_cache_order: timestamp, record_id: id}) do
def from(%HydrationCacheEntry{cache_order: timestamp, record_id: id}) do
%__MODULE__{timestamp: timestamp, id: id}
end

Expand Down
44 changes: 38 additions & 6 deletions lib/dpul_collections/indexing_pipeline/figgy/indexing_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,24 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducer do
last_queried_marker: Figgy.TransformationCacheEntryMarker.t(),
pulled_records: [Figgy.TransformationCacheEntryMarker.t()],
acked_records: [Figgy.TransformationCacheEntryMarker.t()],
cache_version: Integer
cache_version: Integer,
stored_demand: Integer
}
@spec init(integer()) :: {:producer, state()}
def init(cache_version) do
# trap the exit so we can stop gracefully
# see https://www.erlang.org/doc/apps/erts/erlang.html#process_flag/2
Process.flag(:trap_exit, true)

last_queried_marker =
IndexingPipeline.get_processor_marker!("figgy_indexer", cache_version)

initial_state = %{
last_queried_marker: last_queried_marker |> Figgy.TransformationCacheEntryMarker.from(),
pulled_records: [],
acked_records: [],
cache_version: cache_version
cache_version: cache_version,
stored_demand: 0
}

{:producer, initial_state}
Expand All @@ -41,12 +47,14 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducer do
state = %{
last_queried_marker: last_queried_marker,
pulled_records: pulled_records,
acked_records: acked_records
acked_records: acked_records,
stored_demand: stored_demand
}
)
when demand > 0 do
) do
total_demand = stored_demand + demand

records =
IndexingPipeline.get_transformation_cache_entries_since!(last_queried_marker, demand)
IndexingPipeline.get_transformation_cache_entries_since!(last_queried_marker, total_demand)

new_state =
state
Expand All @@ -62,10 +70,34 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducer do
)
)
|> Map.put(:acked_records, acked_records)
|> Map.put(:stored_demand, calculate_stored_demand(total_demand, length(records)))

# Set a timer to try fulfilling demand again later
if new_state.stored_demand > 0 do
Process.send_after(self(), :check_for_updates, 50)
end

{:noreply, Enum.map(records, &wrap_record/1), new_state}
end

defp calculate_stored_demand(total_demand, fulfilled_demand)
when total_demand == fulfilled_demand do
0
end

defp calculate_stored_demand(total_demand, fulfilled_demand)
when total_demand > fulfilled_demand do
total_demand - fulfilled_demand
end

def handle_info(:check_for_updates, state = %{stored_demand: demand}) when demand > 0 do
handle_demand(0, state)
end

def handle_info(:check_for_updates, state) do
{:noreply, [], state}
end

@impl GenStage
@spec handle_info({atom(), atom(), list(%Figgy.TransformationCacheEntryMarker{})}, state()) ::
{:noreply, list(Broadway.Message.t()), state()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationCacheEntryMarker
def from(nil), do: nil

@spec from(%TransformationCacheEntry{}) :: t()
def from(%TransformationCacheEntry{source_cache_order: timestamp, record_id: id}) do
def from(%TransformationCacheEntry{cache_order: timestamp, record_id: id}) do
%__MODULE__{timestamp: timestamp, id: id}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationConsumer do
IndexingPipeline.write_transformation_cache_entry(%{
cache_version: cache_version,
record_id: hydration_cache_entry |> Map.get(:record_id),
source_cache_order: hydration_cache_entry |> Map.get(:source_cache_order),
source_cache_order: hydration_cache_entry |> Map.get(:cache_order),
data: solr_doc
})
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,24 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationProducer do
last_queried_marker: Figgy.HydrationCacheEntryMarker.t(),
pulled_records: [Figgy.HydrationCacheEntryMarker.t()],
acked_records: [Figgy.HydrationCacheEntryMarker.t()],
cache_version: Integer
cache_version: Integer,
stored_demand: Integer
}
@spec init(integer()) :: {:producer, state()}
def init(cache_version) do
# trap the exit so we can stop gracefully
# see https://www.erlang.org/doc/apps/erts/erlang.html#process_flag/2
Process.flag(:trap_exit, true)

last_queried_marker =
IndexingPipeline.get_processor_marker!("figgy_transformer", cache_version)

initial_state = %{
last_queried_marker: last_queried_marker |> Figgy.HydrationCacheEntryMarker.from(),
pulled_records: [],
acked_records: [],
cache_version: cache_version
cache_version: cache_version,
stored_demand: 0
}

{:producer, initial_state}
Expand All @@ -42,11 +48,14 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationProducer do
state = %{
last_queried_marker: last_queried_marker,
pulled_records: pulled_records,
acked_records: acked_records
acked_records: acked_records,
stored_demand: stored_demand
}
)
when demand > 0 do
records = IndexingPipeline.get_hydration_cache_entries_since!(last_queried_marker, demand)
) do
total_demand = stored_demand + demand

records =
IndexingPipeline.get_hydration_cache_entries_since!(last_queried_marker, total_demand)

new_state =
state
Expand All @@ -59,10 +68,36 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationProducer do
Enum.concat(pulled_records, Enum.map(records, &Figgy.HydrationCacheEntryMarker.from/1))
)
|> Map.put(:acked_records, acked_records)
|> Map.put(:stored_demand, calculate_stored_demand(total_demand, length(records)))

# Set a timer to try fulfilling demand again later
if new_state.stored_demand > 0 do
Process.send_after(self(), :check_for_updates, 50)
end

{:noreply, Enum.map(records, &wrap_record/1), new_state}
end

defp calculate_stored_demand(total_demand, fulfilled_demand)
when total_demand == fulfilled_demand do
0
end

defp calculate_stored_demand(total_demand, fulfilled_demand)
when total_demand > fulfilled_demand do
total_demand - fulfilled_demand
end

def handle_info(:check_for_updates, state = %{stored_demand: demand})
when demand > 0 do
new_demand = 0
handle_demand(new_demand, state)
end

def handle_info(:check_for_updates, state) do
{:noreply, [], state}
end

@impl GenStage
@spec handle_info({atom(), atom(), list(%Figgy.HydrationCacheEntryMarker{})}, state()) ::
{:noreply, list(Broadway.Message.t()), state()}
Expand Down
13 changes: 13 additions & 0 deletions lib/dpul_collections/solr.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,19 @@ defmodule DpulCollections.Solr do
response.body["response"]["numFound"]
end

def find_by_id(id) do
{:ok, response} =
Req.get(
select_url(),
params: [q: "id:#{id}"]
)

case response.body["response"]["docs"] do
[] -> nil
[doc] -> doc
end
end

@spec add(list(map())) :: {:ok, Req.Response.t()} | {:error, Exception.t()}
def add(docs) do
Req.post(
Expand Down
Loading

0 comments on commit 6ea969d

Please sign in to comment.