diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_load.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_load.ex index 5e2c4c3..8ed3e9f 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_load.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_load.ex @@ -80,13 +80,14 @@ defmodule ExCubicIngestion.Schema.CubicLoad do @spec insert_new_from_objects_with_table([map()], CubicTable.t()) :: {:ok, [t()]} | {:error, term()} def insert_new_from_objects_with_table(objects, table) do + datetime_since = DateTime.add(DateTime.utc_now(), -86_400) + # get the last inserted load which will be used to further filter the objects - last_inserted_load_rec = - Repo.one( + last_inserted_load_recs = + Repo.all( from(load in not_deleted(), - where: load.table_id == ^table.id, - order_by: [desc: load.s3_modified], - limit: 1 + where: load.table_id == ^table.id and load.s3_modified > ^datetime_since, + order_by: [desc: load.s3_modified] ) ) @@ -94,10 +95,10 @@ defmodule ExCubicIngestion.Schema.CubicLoad do # the last object we have in database new_objects = Enum.filter(objects, fn object -> - last_modified = parse_and_drop_msec(object.last_modified) - - is_nil(last_inserted_load_rec) or - DateTime.compare(last_modified, last_inserted_load_rec.s3_modified) == :gt + not Enum.any?(last_inserted_load_recs, fn rec -> + rec.s3_key == object.key and + rec.s3_modified == parse_and_drop_msec(object.last_modified) + end) end) if Enum.empty?(new_objects) do diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs index e6a101d..b950cac 100644 --- a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs @@ -46,7 +46,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do }} end - describe "insert_new_from_objects_with_table/1" do + describe "insert_new_from_objects_with_table/3" do test "providing a non-empty list of objects", %{ table: table, utc_now: utc_now, @@ -61,7 +61,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do assert {:ok, []} == CubicLoad.insert_new_from_objects_with_table(load_objects, table) # add a new object - load_objects = [ + updated_load_objects = [ %{ key: "cubic/dmap/sample/20220103.csv.gz", last_modified: MockExAws.Data.dt_adjust_and_format(utc_now, -2400), @@ -76,7 +76,29 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do %CubicLoad{ s3_key: "cubic/dmap/sample/20220103.csv.gz" } - ]} = CubicLoad.insert_new_from_objects_with_table(load_objects, table) + ]} = CubicLoad.insert_new_from_objects_with_table(updated_load_objects, table) + + # lastly, add one more but with an older timestamp than the last one + updated_load_objects_with_one_extra = [ + %{ + key: "cubic/dmap/sample/20220104.csv.gz", + last_modified: MockExAws.Data.dt_adjust_and_format(utc_now, -2700), + size: "197" + } + | updated_load_objects + ] + + # adding one more load object, should only insert it as a load record + assert {:ok, + [ + %CubicLoad{ + s3_key: "cubic/dmap/sample/20220104.csv.gz" + } + ]} = + CubicLoad.insert_new_from_objects_with_table( + updated_load_objects_with_one_extra, + table + ) end test "providing an empty list of objects", %{