diff --git a/lib/dpul_collections/indexing_pipeline.ex b/lib/dpul_collections/indexing_pipeline.ex index bcfea7d..9d54f36 100644 --- a/lib/dpul_collections/indexing_pipeline.ex +++ b/lib/dpul_collections/indexing_pipeline.ex @@ -59,7 +59,13 @@ defmodule DpulCollections.IndexingPipeline do def write_hydration_cache_entry(attrs \\ %{}) do conflict_query = Figgy.HydrationCacheEntry - |> update(set: [data: ^attrs.data, source_cache_order: ^attrs.source_cache_order]) + |> update( + set: [ + data: ^attrs.data, + source_cache_order: ^attrs.source_cache_order, + cache_order: ^DateTime.utc_now() + ] + ) |> where([c], c.source_cache_order <= ^attrs.source_cache_order) try do @@ -79,16 +85,16 @@ defmodule DpulCollections.IndexingPipeline do count :: integer ) :: list(Figgy.HydrationCacheEntry) def get_hydration_cache_entries_since!( - %Figgy.HydrationCacheEntryMarker{timestamp: source_cache_order, id: id}, + %Figgy.HydrationCacheEntryMarker{timestamp: cache_order, id: id}, count ) do query = from r in Figgy.HydrationCacheEntry, where: - (r.source_cache_order == ^source_cache_order and r.record_id > ^id) or - r.source_cache_order > ^source_cache_order, + (r.cache_order == ^cache_order and r.record_id > ^id) or + r.cache_order > ^cache_order, limit: ^count, - order_by: [asc: r.source_cache_order, asc: r.record_id] + order_by: [asc: r.cache_order, asc: r.record_id] Repo.all(query) end @@ -302,16 +308,16 @@ defmodule DpulCollections.IndexingPipeline do count :: integer ) :: list(Figgy.TransformationCacheEntry) def get_transformation_cache_entries_since!( - %Figgy.TransformationCacheEntryMarker{timestamp: source_cache_order, id: id}, + %Figgy.TransformationCacheEntryMarker{timestamp: cache_order, id: id}, count ) do query = from r in Figgy.TransformationCacheEntry, where: - (r.source_cache_order == ^source_cache_order and r.record_id > ^id) or - r.source_cache_order > ^source_cache_order, + (r.cache_order == ^cache_order and r.record_id > ^id) or + r.cache_order > ^cache_order, limit: ^count, - order_by: [asc: r.source_cache_order, asc: r.record_id] + order_by: [asc: r.cache_order, asc: r.record_id] Repo.all(query) end @@ -324,7 +330,7 @@ defmodule DpulCollections.IndexingPipeline do query = from r in Figgy.TransformationCacheEntry, limit: ^count, - order_by: [asc: r.source_cache_order, asc: r.record_id] + order_by: [asc: r.cache_order, asc: r.record_id] Repo.all(query) end @@ -353,7 +359,13 @@ defmodule DpulCollections.IndexingPipeline do def write_transformation_cache_entry(attrs \\ %{}) do conflict_query = Figgy.TransformationCacheEntry - |> update(set: [data: ^attrs.data, source_cache_order: ^attrs.source_cache_order]) + |> update( + set: [ + data: ^attrs.data, + source_cache_order: ^attrs.source_cache_order, + cache_order: ^DateTime.utc_now() + ] + ) |> where([c], c.source_cache_order <= ^attrs.source_cache_order) try do diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry_marker.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry_marker.ex index ccac22d..d63ebfd 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry_marker.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry_marker.ex @@ -18,7 +18,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationCacheEntryMarker do def from(nil), do: nil @spec from(%HydrationCacheEntry{}) :: t() - def from(%HydrationCacheEntry{source_cache_order: timestamp, record_id: id}) do + def from(%HydrationCacheEntry{cache_order: timestamp, record_id: id}) do %__MODULE__{timestamp: timestamp, id: id} end diff --git a/lib/dpul_collections/indexing_pipeline/figgy/indexing_producer.ex b/lib/dpul_collections/indexing_pipeline/figgy/indexing_producer.ex index 50aee58..6905ea5 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/indexing_producer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/indexing_producer.ex @@ -17,10 +17,15 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducer do last_queried_marker: Figgy.TransformationCacheEntryMarker.t(), pulled_records: [Figgy.TransformationCacheEntryMarker.t()], acked_records: [Figgy.TransformationCacheEntryMarker.t()], - cache_version: Integer + cache_version: Integer, + stored_demand: Integer } @spec init(integer()) :: {:producer, state()} def init(cache_version) do + # trap the exit so we can stop gracefully + # see https://www.erlang.org/doc/apps/erts/erlang.html#process_flag/2 + Process.flag(:trap_exit, true) + last_queried_marker = IndexingPipeline.get_processor_marker!("figgy_indexer", cache_version) @@ -28,7 +33,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducer do last_queried_marker: last_queried_marker |> Figgy.TransformationCacheEntryMarker.from(), pulled_records: [], acked_records: [], - cache_version: cache_version + cache_version: cache_version, + stored_demand: 0 } {:producer, initial_state} @@ -41,12 +47,14 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducer do state = %{ last_queried_marker: last_queried_marker, pulled_records: pulled_records, - acked_records: acked_records + acked_records: acked_records, + stored_demand: stored_demand } - ) - when demand > 0 do + ) do + total_demand = stored_demand + demand + records = - IndexingPipeline.get_transformation_cache_entries_since!(last_queried_marker, demand) + IndexingPipeline.get_transformation_cache_entries_since!(last_queried_marker, total_demand) new_state = state @@ -62,10 +70,34 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducer do ) ) |> Map.put(:acked_records, acked_records) + |> Map.put(:stored_demand, calculate_stored_demand(total_demand, length(records))) + + # Set a timer to try fulfilling demand again later + if new_state.stored_demand > 0 do + Process.send_after(self(), :check_for_updates, 50) + end {:noreply, Enum.map(records, &wrap_record/1), new_state} end + defp calculate_stored_demand(total_demand, fulfilled_demand) + when total_demand == fulfilled_demand do + 0 + end + + defp calculate_stored_demand(total_demand, fulfilled_demand) + when total_demand > fulfilled_demand do + total_demand - fulfilled_demand + end + + def handle_info(:check_for_updates, state = %{stored_demand: demand}) when demand > 0 do + handle_demand(0, state) + end + + def handle_info(:check_for_updates, state) do + {:noreply, [], state} + end + @impl GenStage @spec handle_info({atom(), atom(), list(%Figgy.TransformationCacheEntryMarker{})}, state()) :: {:noreply, list(Broadway.Message.t()), state()} diff --git a/lib/dpul_collections/indexing_pipeline/figgy/transformation_cache_entry_marker.ex b/lib/dpul_collections/indexing_pipeline/figgy/transformation_cache_entry_marker.ex index 0cf0ed5..dc56e7c 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/transformation_cache_entry_marker.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/transformation_cache_entry_marker.ex @@ -18,7 +18,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationCacheEntryMarker def from(nil), do: nil @spec from(%TransformationCacheEntry{}) :: t() - def from(%TransformationCacheEntry{source_cache_order: timestamp, record_id: id}) do + def from(%TransformationCacheEntry{cache_order: timestamp, record_id: id}) do %__MODULE__{timestamp: timestamp, id: id} end diff --git a/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex index 0fc4e4d..4ac8e68 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex @@ -84,7 +84,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationConsumer do IndexingPipeline.write_transformation_cache_entry(%{ cache_version: cache_version, record_id: hydration_cache_entry |> Map.get(:record_id), - source_cache_order: hydration_cache_entry |> Map.get(:source_cache_order), + source_cache_order: hydration_cache_entry |> Map.get(:cache_order), data: solr_doc }) end diff --git a/lib/dpul_collections/indexing_pipeline/figgy/transformation_producer.ex b/lib/dpul_collections/indexing_pipeline/figgy/transformation_producer.ex index 4cb1902..27609e9 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/transformation_producer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/transformation_producer.ex @@ -18,10 +18,15 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationProducer do last_queried_marker: Figgy.HydrationCacheEntryMarker.t(), pulled_records: [Figgy.HydrationCacheEntryMarker.t()], acked_records: [Figgy.HydrationCacheEntryMarker.t()], - cache_version: Integer + cache_version: Integer, + stored_demand: Integer } @spec init(integer()) :: {:producer, state()} def init(cache_version) do + # trap the exit so we can stop gracefully + # see https://www.erlang.org/doc/apps/erts/erlang.html#process_flag/2 + Process.flag(:trap_exit, true) + last_queried_marker = IndexingPipeline.get_processor_marker!("figgy_transformer", cache_version) @@ -29,7 +34,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationProducer do last_queried_marker: last_queried_marker |> Figgy.HydrationCacheEntryMarker.from(), pulled_records: [], acked_records: [], - cache_version: cache_version + cache_version: cache_version, + stored_demand: 0 } {:producer, initial_state} @@ -42,11 +48,14 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationProducer do state = %{ last_queried_marker: last_queried_marker, pulled_records: pulled_records, - acked_records: acked_records + acked_records: acked_records, + stored_demand: stored_demand } - ) - when demand > 0 do - records = IndexingPipeline.get_hydration_cache_entries_since!(last_queried_marker, demand) + ) do + total_demand = stored_demand + demand + + records = + IndexingPipeline.get_hydration_cache_entries_since!(last_queried_marker, total_demand) new_state = state @@ -59,10 +68,36 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationProducer do Enum.concat(pulled_records, Enum.map(records, &Figgy.HydrationCacheEntryMarker.from/1)) ) |> Map.put(:acked_records, acked_records) + |> Map.put(:stored_demand, calculate_stored_demand(total_demand, length(records))) + + # Set a timer to try fulfilling demand again later + if new_state.stored_demand > 0 do + Process.send_after(self(), :check_for_updates, 50) + end {:noreply, Enum.map(records, &wrap_record/1), new_state} end + defp calculate_stored_demand(total_demand, fulfilled_demand) + when total_demand == fulfilled_demand do + 0 + end + + defp calculate_stored_demand(total_demand, fulfilled_demand) + when total_demand > fulfilled_demand do + total_demand - fulfilled_demand + end + + def handle_info(:check_for_updates, state = %{stored_demand: demand}) + when demand > 0 do + new_demand = 0 + handle_demand(new_demand, state) + end + + def handle_info(:check_for_updates, state) do + {:noreply, [], state} + end + @impl GenStage @spec handle_info({atom(), atom(), list(%Figgy.HydrationCacheEntryMarker{})}, state()) :: {:noreply, list(Broadway.Message.t()), state()} diff --git a/lib/dpul_collections/solr.ex b/lib/dpul_collections/solr.ex index 74348dc..d17cc1c 100644 --- a/lib/dpul_collections/solr.ex +++ b/lib/dpul_collections/solr.ex @@ -10,6 +10,19 @@ defmodule DpulCollections.Solr do response.body["response"]["numFound"] end + def find_by_id(id) do + {:ok, response} = + Req.get( + select_url(), + params: [q: "id:#{id}"] + ) + + case response.body["response"]["docs"] do + [] -> nil + [doc] -> doc + end + end + @spec add(list(map())) :: {:ok, Req.Response.t()} | {:error, Exception.t()} def add(docs) do Req.post( diff --git a/test/dpul_collections/indexing_pipeline/figgy/indexing_producer_test.exs b/test/dpul_collections/indexing_pipeline/figgy/indexing_producer_test.exs index 698ceae..e634c70 100644 --- a/test/dpul_collections/indexing_pipeline/figgy/indexing_producer_test.exs +++ b/test/dpul_collections/indexing_pipeline/figgy/indexing_producer_test.exs @@ -31,7 +31,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do marker2 ], acked_records: [], - cache_version: index_version + cache_version: index_version, + stored_demand: 0 } assert new_state == expected_state @@ -48,7 +49,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do marker2 ], acked_records: [], - cache_version: 0 + cache_version: 0, + stored_demand: 0 } {:noreply, messages, new_state} = @@ -72,7 +74,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do marker3 ], acked_records: [], - cache_version: 0 + cache_version: 0, + stored_demand: 0 } assert new_state == expected_state @@ -94,7 +97,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do last_queried_marker: fabricated_marker, pulled_records: [], acked_records: [], - cache_version: 0 + cache_version: 0, + stored_demand: 0 } {:noreply, messages, new_state} = @@ -107,7 +111,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do last_queried_marker: fabricated_marker, pulled_records: [], acked_records: [], - cache_version: 0 + cache_version: 0, + stored_demand: 1 } assert new_state == expected_state @@ -125,7 +130,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do marker3 ], acked_records: [], - cache_version: cache_version + cache_version: cache_version, + stored_demand: 0 } acked_indexing_markers = @@ -144,7 +150,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do acked_records: [ marker3 ], - cache_version: cache_version + cache_version: cache_version, + stored_demand: 0 } {:noreply, [], new_state} = @@ -170,7 +177,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do last_queried_marker: marker3, pulled_records: [], acked_records: [], - cache_version: cache_version + cache_version: cache_version, + stored_demand: 0 } {:noreply, [], new_state} = @@ -201,7 +209,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do marker3 ], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } acked_indexing_markers = @@ -220,7 +229,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do acked_records: [ marker2 ], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } {:noreply, [], new_state} = @@ -241,7 +251,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do last_queried_marker: marker3, pulled_records: [], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } acked_indexing_markers = @@ -254,7 +265,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do last_queried_marker: marker3, pulled_records: [], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } {:noreply, [], new_state} = @@ -280,7 +292,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do acked_records: [ marker2 ], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } acked_indexing_markers = @@ -298,7 +311,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do acked_records: [ marker2 ], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } {:noreply, [], new_state} = @@ -325,7 +339,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do marker2 ], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } first_ack = @@ -339,7 +354,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do marker2 ], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } {:noreply, [], new_state} = @@ -360,7 +376,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do last_queried_marker: marker2, pulled_records: [], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } {:noreply, [], new_state} = @@ -377,5 +394,10 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerTest do assert processor_marker == marker2 end + + test ".handle_info(:check_for_updates) with no stored demand" do + assert Figgy.IndexingProducer.handle_info(:check_for_updates, %{stored_demand: 0}) == + {:noreply, [], %{stored_demand: 0}} + end end end diff --git a/test/dpul_collections/indexing_pipeline/figgy/transformer_producer_test.exs b/test/dpul_collections/indexing_pipeline/figgy/transformer_producer_test.exs index 91621a6..75593ac 100644 --- a/test/dpul_collections/indexing_pipeline/figgy/transformer_producer_test.exs +++ b/test/dpul_collections/indexing_pipeline/figgy/transformer_producer_test.exs @@ -28,7 +28,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do marker2 ], acked_records: [], - cache_version: 0 + cache_version: 0, + stored_demand: 0 } assert new_state == expected_state @@ -45,7 +46,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do marker2 ], acked_records: [], - cache_version: 0 + cache_version: 0, + stored_demand: 0 } {:noreply, messages, new_state} = @@ -67,7 +69,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do marker3 ], acked_records: [], - cache_version: 0 + cache_version: 0, + stored_demand: 0 } assert new_state == expected_state @@ -89,7 +92,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do last_queried_marker: fabricated_marker, pulled_records: [], acked_records: [], - cache_version: 0 + cache_version: 0, + stored_demand: 0 } {:noreply, messages, new_state} = @@ -102,7 +106,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do last_queried_marker: fabricated_marker, pulled_records: [], acked_records: [], - cache_version: 0 + cache_version: 0, + stored_demand: 1 } assert new_state == expected_state @@ -120,7 +125,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do marker3 ], acked_records: [], - cache_version: cache_version + cache_version: cache_version, + stored_demand: 0 } acked_hydration_cache_markers = @@ -139,7 +145,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do acked_records: [ marker3 ], - cache_version: cache_version + cache_version: cache_version, + stored_demand: 0 } {:noreply, [], new_state} = @@ -165,7 +172,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do last_queried_marker: marker3, pulled_records: [], acked_records: [], - cache_version: cache_version + cache_version: cache_version, + stored_demand: 0 } {:noreply, [], new_state} = @@ -196,7 +204,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do marker3 ], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } acked_hydration_cache_markers = @@ -215,7 +224,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do acked_records: [ marker2 ], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } {:noreply, [], new_state} = @@ -236,7 +246,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do last_queried_marker: marker3, pulled_records: [], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } acked_hydration_cache_markers = @@ -249,7 +260,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do last_queried_marker: marker3, pulled_records: [], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } {:noreply, [], new_state} = @@ -275,7 +287,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do acked_records: [ marker2 ], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } acked_hydration_cache_markers = @@ -293,7 +306,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do acked_records: [ marker2 ], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } {:noreply, [], new_state} = @@ -320,7 +334,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do marker2 ], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } first_ack = @@ -334,7 +349,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do marker2 ], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } {:noreply, [], new_state} = @@ -355,7 +371,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do last_queried_marker: marker2, pulled_records: [], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } {:noreply, [], new_state} = @@ -372,5 +389,10 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformerProducerTest do assert processor_marker == marker2 end + + test ".handle_info(:check_for_updates) with no stored demand" do + assert Figgy.TransformationProducer.handle_info(:check_for_updates, %{stored_demand: 0}) == + {:noreply, [], %{stored_demand: 0}} + end end end diff --git a/test/dpul_collections/indexing_pipeline/integration/figgy/indexing_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/figgy/indexing_integration_test.exs new file mode 100644 index 0000000..44952fa --- /dev/null +++ b/test/dpul_collections/indexing_pipeline/integration/figgy/indexing_integration_test.exs @@ -0,0 +1,94 @@ +defmodule DpulCollections.IndexingPipeline.Figgy.IndexingIntegrationTest do + use DpulCollections.DataCase + + alias DpulCollections.IndexingPipeline.Figgy + alias DpulCollections.IndexingPipeline + alias DpulCollections.Solr + + setup do + Solr.delete_all() + :ok + end + + def start_indexing_producer(batch_size \\ 1) do + pid = self() + + :telemetry.attach( + "ack-handler-#{pid |> :erlang.pid_to_list()}", + [:indexing_producer, :ack, :done], + fn _event, _, _, _ -> send(pid, {:ack_done}) end, + nil + ) + + {:ok, indexer} = + Figgy.IndexingConsumer.start_link( + cache_version: 0, + producer_module: MockFiggyIndexingProducer, + producer_options: {self()}, + batch_size: batch_size + ) + + indexer + end + + test "solr document creation" do + FiggyTestFixtures.transformation_cache_markers() + + indexer = start_indexing_producer() + + MockFiggyIndexingProducer.process(1) + assert_receive {:ack_done} + + Solr.commit() + assert Solr.document_count() == 1 + + indexer |> Broadway.stop(:normal) + end + + test "doesn't override newer solr document versions" do + # TODO: Think more on this use case + end + + test "updates existing solr document versions" do + {marker1, _marker2, _marker3} = FiggyTestFixtures.transformation_cache_markers() + + Solr.add(%{ + "id" => marker1.id, + "title" => ["old title"] + }) + + # Process that past record. + indexer = start_indexing_producer() + MockFiggyIndexingProducer.process(1) + assert_receive {:ack_done} + indexer |> Broadway.stop(:normal) + # Ensure there's only one solr document + Solr.commit() + assert Solr.document_count() == 1 + # Ensure that entry has the new title + doc = Solr.find_by_id(marker1.id) + assert doc["title_ss"] == ["test title 1"] + end + + test "loads a marker from the database on startup" do + {marker1, marker2, _marker3} = FiggyTestFixtures.transformation_cache_markers() + + # Create a marker + IndexingPipeline.write_processor_marker(%{ + type: "figgy_indexer", + cache_version: 0, + cache_location: marker1.timestamp, + cache_record_id: marker1.id + }) + + # Start the producer + indexer = start_indexing_producer() + MockFiggyIndexingProducer.process(1) + assert_receive {:ack_done} + Solr.commit() + # Make sure the first record that comes back is what we expect + doc = Solr.find_by_id(marker2.id) + assert doc["title_ss"] == ["test title 2"] + indexer |> Broadway.stop(:normal) + end +end diff --git a/test/dpul_collections/indexing_pipeline/integration/figgy/transformation_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/figgy/transformation_integration_test.exs index d23d77d..b1df3f2 100644 --- a/test/dpul_collections/indexing_pipeline/integration/figgy/transformation_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/figgy/transformation_integration_test.exs @@ -28,17 +28,6 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationIntegrationTest d test "transformation cache entry creation" do {marker1, _marker2, _marker3} = FiggyTestFixtures.hydration_cache_markers() - IndexingPipeline.write_hydration_cache_entry(%{ - cache_version: 0, - record_id: marker1.id, - source_cache_order: marker1.timestamp, - data: %{ - "id" => marker1.id, - "internal_resource" => "EphemeraFolder", - "metadata" => %{"title" => ["test title"]} - } - }) - transformer = start_transformer_producer() MockFiggyTransformationProducer.process(1) @@ -52,7 +41,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationIntegrationTest d assert %{ "id" => ^marker_1_id, - "title_ss" => ["test title"] + "title_ss" => ["test title 1"] } = cache_entry.data transformer |> Broadway.stop(:normal) @@ -117,28 +106,6 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationIntegrationTest d test "loads a marker from the database on startup" do {marker1, marker2, _marker3} = FiggyTestFixtures.hydration_cache_markers() - IndexingPipeline.write_hydration_cache_entry(%{ - cache_version: 0, - record_id: marker1.id, - source_cache_order: marker1.timestamp, - data: %{ - "id" => marker1.id, - "internal_resource" => "EphemeraFolder", - "metadata" => %{"title" => ["test title 1"]} - } - }) - - IndexingPipeline.write_hydration_cache_entry(%{ - cache_version: 0, - record_id: marker2.id, - source_cache_order: marker2.timestamp, - data: %{ - "id" => marker2.id, - "internal_resource" => "EphemeraFolder", - "metadata" => %{"title" => ["test title 2"]} - } - }) - # Create a marker IndexingPipeline.write_processor_marker(%{ type: "figgy_transformer", diff --git a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs index fd57386..47a8dc3 100644 --- a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs @@ -1,7 +1,7 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do use DpulCollections.DataCase - alias DpulCollections.{FiggyRepo, Repo} + alias DpulCollections.Repo alias DpulCollections.IndexingPipeline.Figgy alias DpulCollections.IndexingPipeline alias DpulCollections.Solr @@ -11,140 +11,46 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do :ok end - def start_figgy_producer(batch_size \\ 1) do - {:ok, hydrator} = - Figgy.HydrationConsumer.start_link( - cache_version: 0, - producer_module: MockFiggyHydrationProducer, - producer_options: {self()}, - batch_size: batch_size - ) + def wait_for_index_completion() do + transformer_cache_entries = IndexingPipeline.list_transformation_cache_entries() |> length + ephemera_folder_count = FiggyTestSupport.ephemera_folder_count() - hydrator - end - - def start_transformer_producer(batch_size \\ 1) do - pid = self() - - :telemetry.attach( - "ack-handler-#{pid |> :erlang.pid_to_list()}", - [:transformer_producer, :ack, :done], - fn _event, _, _, _ -> send(pid, {:ack_done}) end, - nil - ) - - {:ok, transformer} = - Figgy.TransformationConsumer.start_link( - cache_version: 0, - producer_module: MockFiggyTransformationProducer, - producer_options: {self()}, - batch_size: batch_size - ) - - transformer - end - - def start_indexing_producer(batch_size \\ 1) do - pid = self() - - :telemetry.attach( - "ack-handler-#{pid |> :erlang.pid_to_list()}", - [:indexing_producer, :ack, :done], - fn _event, _, _, _ -> send(pid, {:ack_done}) end, - nil - ) - - {:ok, indexer} = - Figgy.IndexingConsumer.start_link( - cache_version: 0, - producer_module: MockFiggyIndexingProducer, - producer_options: {self()}, - batch_size: batch_size - ) - - indexer - end - - def wait_for_hydrated_id(id, cache_version \\ 0) do - case IndexingPipeline.get_processor_marker!("hydrator", 0) do - %{cache_record_id: ^id} -> - true - - _ -> - :timer.sleep(50) - wait_for_hydrated_id(id, cache_version) - end - end - - def wait_for_transformed_id(id, cache_version \\ 0) do - case IndexingPipeline.get_processor_marker!("figgy_transformer", 0) do - %{cache_record_id: ^id} -> - true - - _ -> - :timer.sleep(50) - wait_for_transformed_id(id, cache_version) - end - end + continue = + if transformer_cache_entries == ephemera_folder_count do + DpulCollections.Solr.commit() - def wait_for_indexed_id(id, cache_version \\ 0) do - case IndexingPipeline.get_processor_marker!("figgy_indexer", 0) do - %{cache_record_id: ^id} -> - true + if DpulCollections.Solr.document_count() == transformer_cache_entries do + true + end + end - _ -> - :timer.sleep(50) - wait_for_indexed_id(id, cache_version) - end + continue || (:timer.sleep(100) && wait_for_index_completion()) end test "a full hydrator and transformer run" do # Start the figgy producer - hydrator = start_figgy_producer(50) - # Demand all of them. - count = FiggyRepo.aggregate(Figgy.Resource, :count) - MockFiggyHydrationProducer.process(count) - # Wait for the last ID to show up. + {:ok, indexer} = Figgy.IndexingConsumer.start_link(batch_size: 50) + {:ok, transformer} = Figgy.TransformationConsumer.start_link(batch_size: 50) + {:ok, hydrator} = Figgy.HydrationConsumer.start_link(batch_size: 50) + task = - Task.async(fn -> wait_for_hydrated_id(FiggyTestSupport.last_figgy_resource_marker().id) end) + Task.async(fn -> wait_for_index_completion() end) Task.await(task, 15000) - hydrator |> Broadway.stop(:normal) # the hydrator pulled all ephemera folders and terms entry_count = Repo.aggregate(Figgy.HydrationCacheEntry, :count) assert FiggyTestSupport.total_resource_count() == entry_count - # Start the transformer producer - transformer = start_transformer_producer(50) - entry_count = Repo.aggregate(Figgy.HydrationCacheEntry, :count) - MockFiggyTransformationProducer.process(entry_count) - # Wait for the last ID to show up. - task = - Task.async(fn -> - wait_for_transformed_id(FiggyTestSupport.last_hydration_cache_entry_marker().id) - end) - - Task.await(task, 15000) - transformation_cache_entry_count = Repo.aggregate(Figgy.TransformationCacheEntry, :count) - # the transformer only processes ephemera folders + transformation_cache_entry_count = Repo.aggregate(Figgy.TransformationCacheEntry, :count) assert FiggyTestSupport.ephemera_folder_count() == transformation_cache_entry_count - transformer |> Broadway.stop(:normal) - - # Start the indexing producer - indexer = start_indexing_producer(50) - MockFiggyIndexingProducer.process(transformation_cache_entry_count) - # Wait for the last ID to show up. - task = - Task.async(fn -> - wait_for_indexed_id(FiggyTestSupport.last_transformation_cache_entry_marker().id) - end) - Task.await(task, 15000) - Solr.commit() + # indexed all the documents assert Solr.document_count() == transformation_cache_entry_count + hydrator |> Broadway.stop(:normal) + transformer |> Broadway.stop(:normal) indexer |> Broadway.stop(:normal) end end diff --git a/test/dpul_collections/indexing_pipeline_test.exs b/test/dpul_collections/indexing_pipeline_test.exs index 4be0bb1..dcaec84 100644 --- a/test/dpul_collections/indexing_pipeline_test.exs +++ b/test/dpul_collections/indexing_pipeline_test.exs @@ -30,6 +30,37 @@ defmodule DpulCollections.IndexingPipelineTest do IndexingPipeline.get_hydration_cache_entry!(hydration_cache_entry.id) end end + + test "write_hydration_cache_entry/1 upserts a cache entry" do + {:ok, first_write} = + IndexingPipeline.write_hydration_cache_entry(%{ + data: %{}, + source_cache_order: ~U[2024-07-23 20:05:00Z], + cache_version: 0, + record_id: "some record_id" + }) + + {:ok, second_write} = + IndexingPipeline.write_hydration_cache_entry(%{ + data: %{}, + source_cache_order: ~U[2024-07-24 20:05:00Z], + cache_version: 0, + record_id: "some record_id" + }) + + {:ok, nil} = + IndexingPipeline.write_hydration_cache_entry(%{ + data: %{}, + source_cache_order: ~U[2024-07-22 20:05:00Z], + cache_version: 0, + record_id: "some record_id" + }) + + reloaded = IndexingPipeline.get_hydration_cache_entry!(second_write.id) + assert first_write.cache_order != reloaded.cache_order + assert reloaded.source_cache_order == second_write.source_cache_order + assert IndexingPipeline.list_hydration_cache_entries() |> length == 1 + end end describe "processor_markers" do diff --git a/test/dpul_collections/solr_test.exs b/test/dpul_collections/solr_test.exs index 2ef4f9d..47fd463 100644 --- a/test/dpul_collections/solr_test.exs +++ b/test/dpul_collections/solr_test.exs @@ -11,6 +11,19 @@ defmodule DpulCollections.SolrTest do assert Solr.document_count() == 0 end + test ".find_by_id/1" do + assert Solr.find_by_id("3cb7627b-defc-401b-9959-42ebc4488f74") == nil + + doc = %{ + "id" => "3cb7627b-defc-401b-9959-42ebc4488f74", + "title_ss" => ["test title 1"] + } + + Solr.add([doc]) + Solr.commit() + assert Solr.find_by_id("3cb7627b-defc-401b-9959-42ebc4488f74")["title_ss"] == doc["title_ss"] + end + test ".add/1" do doc = %{ "id" => "3cb7627b-defc-401b-9959-42ebc4488f74", diff --git a/test/support/figgy_test_support.ex b/test/support/figgy_test_support.ex index 4a4f036..09d4ab0 100644 --- a/test/support/figgy_test_support.ex +++ b/test/support/figgy_test_support.ex @@ -4,35 +4,6 @@ defmodule FiggyTestSupport do alias DpulCollections.IndexingPipeline.Figgy alias DpulCollections.FiggyRepo - alias DpulCollections.Repo - - # Get the last marker from the figgy repo. - def last_figgy_resource_marker do - query = - from r in Figgy.Resource, - limit: 1, - order_by: [desc: r.updated_at, desc: r.id] - - FiggyRepo.all(query) |> hd |> Figgy.ResourceMarker.from() - end - - def last_hydration_cache_entry_marker do - query = - from r in Figgy.HydrationCacheEntry, - limit: 1, - order_by: [desc: r.source_cache_order, desc: r.id] - - Repo.all(query) |> hd |> Figgy.HydrationCacheEntryMarker.from() - end - - def last_transformation_cache_entry_marker do - query = - from r in Figgy.TransformationCacheEntry, - limit: 1, - order_by: [desc: r.source_cache_order, desc: r.id] - - Repo.all(query) |> hd |> Figgy.TransformationCacheEntryMarker.from() - end def total_resource_count do query = diff --git a/test/support/fixtures/figgy_test_fixtures.ex b/test/support/fixtures/figgy_test_fixtures.ex index d926481..aa91afd 100644 --- a/test/support/fixtures/figgy_test_fixtures.ex +++ b/test/support/fixtures/figgy_test_fixtures.ex @@ -40,6 +40,8 @@ defmodule FiggyTestFixtures do } }) + :timer.sleep(1) + {:ok, entry2} = IndexingPipeline.write_hydration_cache_entry(%{ cache_version: cache_version, @@ -52,6 +54,8 @@ defmodule FiggyTestFixtures do } }) + :timer.sleep(1) + {:ok, entry3} = IndexingPipeline.write_hydration_cache_entry(%{ cache_version: cache_version, @@ -65,17 +69,17 @@ defmodule FiggyTestFixtures do }) marker1 = %Figgy.HydrationCacheEntryMarker{ - timestamp: entry1.source_cache_order, + timestamp: entry1.cache_order, id: entry1.record_id } marker2 = %Figgy.HydrationCacheEntryMarker{ - timestamp: entry2.source_cache_order, + timestamp: entry2.cache_order, id: entry2.record_id } marker3 = %Figgy.HydrationCacheEntryMarker{ - timestamp: entry3.source_cache_order, + timestamp: entry3.cache_order, id: entry3.record_id } @@ -117,17 +121,17 @@ defmodule FiggyTestFixtures do }) marker1 = %Figgy.TransformationCacheEntryMarker{ - timestamp: entry1.source_cache_order, + timestamp: entry1.cache_order, id: entry1.record_id } marker2 = %Figgy.TransformationCacheEntryMarker{ - timestamp: entry2.source_cache_order, + timestamp: entry2.cache_order, id: entry2.record_id } marker3 = %Figgy.TransformationCacheEntryMarker{ - timestamp: entry3.source_cache_order, + timestamp: entry3.cache_order, id: entry3.record_id }