From f16f36632a4dc88722cfb77fcac051fa2d943d11 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 29 Aug 2024 11:12:25 +0100 Subject: [PATCH 01/16] Fix deadlocks in the batching algo by replacing `threading.Lock` with `asyncio.Lock` --- weaviate/collections/batch/base.py | 66 +++++++++++++++++++----------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index 207c8926a..06f9429a9 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -1,3 +1,4 @@ +import asyncio import math import threading import time @@ -60,23 +61,23 @@ class BatchRequest(ABC, Generic[TBatchInput, TBatchReturn]): def __init__(self) -> None: self._items: List[TBatchInput] = [] - self._lock = threading.Lock() + self._lock = asyncio.Lock() def __len__(self) -> int: return len(self._items) - def add(self, item: TBatchInput) -> None: + async def add(self, item: TBatchInput) -> None: """Add an item to the BatchRequest.""" - self._lock.acquire() + await self._lock.acquire() self._items.append(item) self._lock.release() - def prepend(self, item: List[TBatchInput]) -> None: + async def prepend(self, item: List[TBatchInput]) -> None: """Add items to the front of the BatchRequest. This is intended to be used when objects should be retries, eg. after a temporary error. """ - self._lock.acquire() + await self._lock.acquire() self._items = item + self._items self._lock.release() @@ -84,7 +85,7 @@ def prepend(self, item: List[TBatchInput]) -> None: class ReferencesBatchRequest(BatchRequest[_BatchReference, BatchReferenceReturn]): """Collect Weaviate-object references to add them in one request to Weaviate.""" - def pop_items(self, pop_amount: int, uuid_lookup: Set[str]) -> List[_BatchReference]: + async def pop_items(self, pop_amount: int, uuid_lookup: Set[str]) -> List[_BatchReference]: """Pop the given number of items from the BatchRequest queue. Returns @@ -92,7 +93,7 @@ def pop_items(self, pop_amount: int, uuid_lookup: Set[str]) -> List[_BatchRefere """ ret: List[_BatchReference] = [] i = 0 - self._lock.acquire() + await self._lock.acquire() while len(ret) < pop_amount and len(self._items) > 0 and i < len(self._items): if self._items[i].from_uuid not in uuid_lookup and ( self._items[i].to_uuid is None or self._items[i].to_uuid not in uuid_lookup @@ -107,13 +108,13 @@ def pop_items(self, pop_amount: int, uuid_lookup: Set[str]) -> List[_BatchRefere class ObjectsBatchRequest(BatchRequest[_BatchObject, BatchObjectReturn]): """Collect objects for one batch request to weaviate.""" - def pop_items(self, pop_amount: int) -> List[_BatchObject]: + async def pop_items(self, pop_amount: int) -> List[_BatchObject]: """Pop the given number of items from the BatchRequest queue. Returns `List[_BatchObject]` items from the BatchRequest. """ - self._lock.acquire() + await self._lock.acquire() if pop_amount >= len(self._items): ret = copy(self._items) self._items.clear() @@ -174,14 +175,14 @@ def __init__( self.__batch_rest = _BatchREST(connection, self.__consistency_level) # lookup table for objects that are currently being processed - is used to not send references from objects that have not been added yet - self.__uuid_lookup_lock = threading.Lock() + self.__uuid_lookup_lock = asyncio.Lock() self.__uuid_lookup: Set[str] = set() # we do not want that users can access the results directly as they are not thread-safe self.__results_for_wrapper_backup = results self.__results_for_wrapper = _BatchDataWrapper() - self.__results_lock = threading.Lock() + self.__results_lock = asyncio.Lock() self.__cluster = _ClusterBatch(self.__connection) @@ -221,7 +222,7 @@ def __init__( self.__recommended_num_refs: int = 50 self.__active_requests = 0 - self.__active_requests_lock = threading.Lock() + self.__active_requests_lock = asyncio.Lock() # dynamic batching self.__time_last_scale_up: float = 0 @@ -284,6 +285,13 @@ def __batch_send(self) -> None: ): time.sleep(1) continue + elif isinstance(self.__batching_mode, _FixedSizeBatching): + if ( + len(self.__batch_objects) + len(self.__batch_references) + < self.__recommended_num_objects + ): + time.sleep(refresh_time) + continue if ( self.__active_requests < self.__concurrent_requests @@ -292,14 +300,18 @@ def __batch_send(self) -> None: self.__time_stamp_last_request = time.time() self._batch_send = True - self.__active_requests_lock.acquire() + self.__loop.run_until_complete(self.__active_requests_lock.acquire) self.__active_requests += 1 self.__active_requests_lock.release() - objs = self.__batch_objects.pop_items(self.__recommended_num_objects) - self.__uuid_lookup_lock.acquire() - refs = self.__batch_references.pop_items( - self.__recommended_num_refs, uuid_lookup=self.__uuid_lookup + objs = self.__loop.run_until_complete( + self.__batch_objects.pop_items, self.__recommended_num_objects + ) + self.__loop.run_until_complete(self.__uuid_lookup_lock.acquire) + refs = self.__loop.run_until_complete( + self.__batch_references.pop_items, + self.__recommended_num_refs, + uuid_lookup=self.__uuid_lookup, ) self.__uuid_lookup_lock.release() # do not block the thread - the results are written to a central (locked) list and we want to have multiple concurrent batch-requests @@ -513,7 +525,7 @@ async def __send_batch( ] readded_uuids = {obj.uuid for obj in readd_objects} - self.__batch_objects.prepend(readd_objects) + await self.__batch_objects.prepend(readd_objects) new_errors = { i: err for i, err in response_obj.errors.items() if i not in readded_objects @@ -542,7 +554,7 @@ async def __send_batch( else: # sleep a bit to recover from the rate limit in other cases time.sleep(2**highest_retry_count) - self.__uuid_lookup_lock.acquire() + await self.__uuid_lookup_lock.acquire() self.__uuid_lookup.difference_update( obj.uuid for obj in objs if obj.uuid not in readded_uuids ) @@ -561,7 +573,7 @@ async def __send_batch( "message": "There have been more than 30 failed object batches. Further errors will not be logged.", } ) - self.__results_lock.acquire() + await self.__results_lock.acquire() self.__results_for_wrapper.results.objs += response_obj self.__results_for_wrapper.failed_objects.extend(response_obj.errors.values()) self.__results_lock.release() @@ -595,12 +607,12 @@ async def __send_batch( "message": "There have been more than 30 failed reference batches. Further errors will not be logged.", } ) - self.__results_lock.acquire() + await self.__results_lock.acquire() self.__results_for_wrapper.results.refs += response_ref self.__results_for_wrapper.failed_references.extend(response_ref.errors.values()) self.__results_lock.release() - self.__active_requests_lock.acquire() + await self.__active_requests_lock.acquire() self.__active_requests -= 1 self.__active_requests_lock.release() @@ -636,15 +648,17 @@ def _add_object( index=self.__objs_count, ) self.__objs_count += 1 + self.__loop.run_until_complete(self.__results_lock.acquire) self.__results_for_wrapper.imported_shards.add( Shard(collection=collection, tenant=tenant) ) + self.__results_lock.release() except ValidationError as e: raise WeaviateBatchValidationError(repr(e)) - self.__uuid_lookup_lock.acquire() + self.__loop.run_until_complete(self.__uuid_lookup_lock.acquire) self.__uuid_lookup.add(str(batch_object.uuid)) self.__uuid_lookup_lock.release() - self.__batch_objects.add(batch_object._to_internal()) + self.__loop.run_until_complete(self.__batch_objects.add, batch_object._to_internal()) # block if queue gets too long or weaviate is overloaded - reading files is faster them sending them so we do # not need a long queue @@ -688,7 +702,9 @@ def _add_reference( ) except ValidationError as e: raise WeaviateBatchValidationError(repr(e)) - self.__batch_references.add(batch_reference._to_internal()) + self.__loop.run_until_complete( + self.__batch_references.add, batch_reference._to_internal() + ) # block if queue gets too long or weaviate is overloaded while self.__recommended_num_objects == 0: From 4e8863f34d152fb62aca5c63a0f0c69f76642b15 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 29 Aug 2024 11:38:21 +0100 Subject: [PATCH 02/16] Remove lower bound requirement on fixed size batching --- integration/test_batch_v4.py | 10 +++++++--- weaviate/collections/batch/base.py | 7 ------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/integration/test_batch_v4.py b/integration/test_batch_v4.py index 4ec6a6de4..79bef8fba 100644 --- a/integration/test_batch_v4.py +++ b/integration/test_batch_v4.py @@ -687,9 +687,13 @@ def test_batching_error_logs( for obj in [{"name": i} for i in range(100)]: batch.add_object(properties=obj, collection=name) assert ( - "Failed to send 100 objects in a batch of 100. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects." - in caplog.text - ) + ("Failed to send" in caplog.text) + and ("objects in a batch of" in caplog.text) + and ( + "Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects." + in caplog.text + ) + ) # number of objects sent per batch is not fixed for less than 100 objects def test_references_with_to_uuids(client_factory: ClientFactory) -> None: diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index 06f9429a9..03631975b 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -285,13 +285,6 @@ def __batch_send(self) -> None: ): time.sleep(1) continue - elif isinstance(self.__batching_mode, _FixedSizeBatching): - if ( - len(self.__batch_objects) + len(self.__batch_references) - < self.__recommended_num_objects - ): - time.sleep(refresh_time) - continue if ( self.__active_requests < self.__concurrent_requests From 057b3dc8e8761cbb1d7259952d48659e2aab4cae Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 29 Aug 2024 12:07:13 +0100 Subject: [PATCH 03/16] change time.sleep to asyncio.sleep in async func --- weaviate/collections/batch/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index 03631975b..e3908fe0d 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -546,7 +546,7 @@ async def __send_batch( ) else: # sleep a bit to recover from the rate limit in other cases - time.sleep(2**highest_retry_count) + await asyncio.sleep(2**highest_retry_count) await self.__uuid_lookup_lock.acquire() self.__uuid_lookup.difference_update( obj.uuid for obj in objs if obj.uuid not in readded_uuids From 3150959d2a25d62748a5ccfa85e2d3230695a4da Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 29 Aug 2024 13:51:55 +0100 Subject: [PATCH 04/16] Ensure `asyncio.Lock`s are opened in the event loop thread to handle implicit 3.{8,9} behaviour --- weaviate/collections/batch/base.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index e3908fe0d..64626595a 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -162,11 +162,7 @@ def __init__( batch_mode: _BatchMode, event_loop: _EventLoop, vectorizer_batching: bool, - objects_: Optional[ObjectsBatchRequest] = None, - references: Optional[ReferencesBatchRequest] = None, ) -> None: - self.__batch_objects = objects_ or ObjectsBatchRequest() - self.__batch_references = references or ReferencesBatchRequest() self.__connection = connection self.__consistency_level: Optional[ConsistencyLevel] = consistency_level self.__vectorizer_batching = vectorizer_batching @@ -175,15 +171,12 @@ def __init__( self.__batch_rest = _BatchREST(connection, self.__consistency_level) # lookup table for objects that are currently being processed - is used to not send references from objects that have not been added yet - self.__uuid_lookup_lock = asyncio.Lock() self.__uuid_lookup: Set[str] = set() # we do not want that users can access the results directly as they are not thread-safe self.__results_for_wrapper_backup = results self.__results_for_wrapper = _BatchDataWrapper() - self.__results_lock = asyncio.Lock() - self.__cluster = _ClusterBatch(self.__connection) self.__batching_mode: _BatchMode = batch_mode @@ -222,7 +215,6 @@ def __init__( self.__recommended_num_refs: int = 50 self.__active_requests = 0 - self.__active_requests_lock = asyncio.Lock() # dynamic batching self.__time_last_scale_up: float = 0 @@ -234,9 +226,18 @@ def __init__( # do 62 secs to give us some buffer to the "per-minute" calculation self.__fix_rate_batching_base_time = 62 + self.__loop.run_until_complete(self.__make_locks) + self.__bg_thread = self.__start_bg_threads() self.__bg_thread_exception: Optional[Exception] = None + async def __make_locks(self) -> None: + self.__batch_objects = ObjectsBatchRequest() + self.__batch_references = ReferencesBatchRequest() + self.__active_requests_lock = asyncio.Lock() + self.__uuid_lookup_lock = asyncio.Lock() + self.__results_lock = asyncio.Lock() + @property def number_errors(self) -> int: """Return the number of errors in the batch.""" @@ -354,6 +355,7 @@ def batch_send_wrapper() -> None: try: self.__batch_send() except Exception as e: + logger.error(e) self.__bg_thread_exception = e demonBatchSend = threading.Thread( @@ -362,6 +364,7 @@ def batch_send_wrapper() -> None: name="BgBatchScheduler", ) demonBatchSend.start() + return demonBatchSend def __dynamic_batching(self) -> None: From d45621ffd0078cf22227d5cc87702a35f1518eae Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 29 Aug 2024 16:13:14 +0100 Subject: [PATCH 05/16] Remove stifling locks in _add_objects --- weaviate/collections/batch/base.py | 37 +++++++++++------------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index e3908fe0d..9fb718f57 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -61,23 +61,23 @@ class BatchRequest(ABC, Generic[TBatchInput, TBatchReturn]): def __init__(self) -> None: self._items: List[TBatchInput] = [] - self._lock = asyncio.Lock() + self._lock = threading.Lock() def __len__(self) -> int: return len(self._items) - async def add(self, item: TBatchInput) -> None: + def add(self, item: TBatchInput) -> None: """Add an item to the BatchRequest.""" - await self._lock.acquire() + self._lock.acquire() self._items.append(item) self._lock.release() - async def prepend(self, item: List[TBatchInput]) -> None: + def prepend(self, item: List[TBatchInput]) -> None: """Add items to the front of the BatchRequest. This is intended to be used when objects should be retries, eg. after a temporary error. """ - await self._lock.acquire() + self._lock.acquire() self._items = item + self._items self._lock.release() @@ -85,7 +85,7 @@ async def prepend(self, item: List[TBatchInput]) -> None: class ReferencesBatchRequest(BatchRequest[_BatchReference, BatchReferenceReturn]): """Collect Weaviate-object references to add them in one request to Weaviate.""" - async def pop_items(self, pop_amount: int, uuid_lookup: Set[str]) -> List[_BatchReference]: + def pop_items(self, pop_amount: int, uuid_lookup: Set[str]) -> List[_BatchReference]: """Pop the given number of items from the BatchRequest queue. Returns @@ -93,7 +93,7 @@ async def pop_items(self, pop_amount: int, uuid_lookup: Set[str]) -> List[_Batch """ ret: List[_BatchReference] = [] i = 0 - await self._lock.acquire() + self._lock.acquire() while len(ret) < pop_amount and len(self._items) > 0 and i < len(self._items): if self._items[i].from_uuid not in uuid_lookup and ( self._items[i].to_uuid is None or self._items[i].to_uuid not in uuid_lookup @@ -108,13 +108,13 @@ async def pop_items(self, pop_amount: int, uuid_lookup: Set[str]) -> List[_Batch class ObjectsBatchRequest(BatchRequest[_BatchObject, BatchObjectReturn]): """Collect objects for one batch request to weaviate.""" - async def pop_items(self, pop_amount: int) -> List[_BatchObject]: + def pop_items(self, pop_amount: int) -> List[_BatchObject]: """Pop the given number of items from the BatchRequest queue. Returns `List[_BatchObject]` items from the BatchRequest. """ - await self._lock.acquire() + self._lock.acquire() if pop_amount >= len(self._items): ret = copy(self._items) self._items.clear() @@ -297,12 +297,9 @@ def __batch_send(self) -> None: self.__active_requests += 1 self.__active_requests_lock.release() - objs = self.__loop.run_until_complete( - self.__batch_objects.pop_items, self.__recommended_num_objects - ) + objs = self.__batch_objects.pop_items(self.__recommended_num_objects) self.__loop.run_until_complete(self.__uuid_lookup_lock.acquire) - refs = self.__loop.run_until_complete( - self.__batch_references.pop_items, + refs = self.__batch_references.pop_items( self.__recommended_num_refs, uuid_lookup=self.__uuid_lookup, ) @@ -518,7 +515,7 @@ async def __send_batch( ] readded_uuids = {obj.uuid for obj in readd_objects} - await self.__batch_objects.prepend(readd_objects) + self.__batch_objects.prepend(readd_objects) new_errors = { i: err for i, err in response_obj.errors.items() if i not in readded_objects @@ -641,17 +638,13 @@ def _add_object( index=self.__objs_count, ) self.__objs_count += 1 - self.__loop.run_until_complete(self.__results_lock.acquire) self.__results_for_wrapper.imported_shards.add( Shard(collection=collection, tenant=tenant) ) - self.__results_lock.release() except ValidationError as e: raise WeaviateBatchValidationError(repr(e)) - self.__loop.run_until_complete(self.__uuid_lookup_lock.acquire) self.__uuid_lookup.add(str(batch_object.uuid)) - self.__uuid_lookup_lock.release() - self.__loop.run_until_complete(self.__batch_objects.add, batch_object._to_internal()) + self.__batch_objects.add(batch_object._to_internal()) # block if queue gets too long or weaviate is overloaded - reading files is faster them sending them so we do # not need a long queue @@ -695,9 +688,7 @@ def _add_reference( ) except ValidationError as e: raise WeaviateBatchValidationError(repr(e)) - self.__loop.run_until_complete( - self.__batch_references.add, batch_reference._to_internal() - ) + self.__batch_references.add(batch_reference._to_internal()) # block if queue gets too long or weaviate is overloaded while self.__recommended_num_objects == 0: From 7cdcfc0555129fd0ca76d68683fb00918b620521 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 29 Aug 2024 16:15:25 +0100 Subject: [PATCH 06/16] Move objs and refs inits back to __init__ --- weaviate/collections/batch/base.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index ae2442862..06c54b066 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -162,7 +162,12 @@ def __init__( batch_mode: _BatchMode, event_loop: _EventLoop, vectorizer_batching: bool, + objects: Optional[ObjectsBatchRequest] = None, + references: Optional[ReferencesBatchRequest] = None, ) -> None: + self.__batch_objects = objects or ObjectsBatchRequest() + self.__batch_references = references or ReferencesBatchRequest() + self.__connection = connection self.__consistency_level: Optional[ConsistencyLevel] = consistency_level self.__vectorizer_batching = vectorizer_batching @@ -226,14 +231,12 @@ def __init__( # do 62 secs to give us some buffer to the "per-minute" calculation self.__fix_rate_batching_base_time = 62 - self.__loop.run_until_complete(self.__make_locks) + self.__loop.run_until_complete(self.__make_asyncio_locks) self.__bg_thread = self.__start_bg_threads() self.__bg_thread_exception: Optional[Exception] = None - async def __make_locks(self) -> None: - self.__batch_objects = ObjectsBatchRequest() - self.__batch_references = ReferencesBatchRequest() + async def __make_asyncio_locks(self) -> None: self.__active_requests_lock = asyncio.Lock() self.__uuid_lookup_lock = asyncio.Lock() self.__results_lock = asyncio.Lock() From d4a3a9ff446deabe337df2dbd502bc25d446d065 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 29 Aug 2024 17:59:48 +0100 Subject: [PATCH 07/16] Add missing props in collection/base.pyi stubs --- weaviate/collections/collection/base.pyi | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/weaviate/collections/collection/base.pyi b/weaviate/collections/collection/base.pyi index e04325702..23f882178 100644 --- a/weaviate/collections/collection/base.pyi +++ b/weaviate/collections/collection/base.pyi @@ -65,3 +65,13 @@ class _CollectionBase(Generic[Properties, References]): The consistency level to use. """ ... + + @property + def tenant(self) -> Optional[str]: + """The tenant of this collection object.""" + ... + + @property + def consistency_level(self) -> Optional[ConsistencyLevel]: + """The consistency level of this collection object.""" + ... \ No newline at end of file From c7bc642725e4866df0fc107538f010b02592d3ee Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Thu, 29 Aug 2024 18:04:43 +0100 Subject: [PATCH 08/16] Fix formatting --- weaviate/collections/collection/base.pyi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaviate/collections/collection/base.pyi b/weaviate/collections/collection/base.pyi index 23f882178..6242a8f53 100644 --- a/weaviate/collections/collection/base.pyi +++ b/weaviate/collections/collection/base.pyi @@ -74,4 +74,4 @@ class _CollectionBase(Generic[Properties, References]): @property def consistency_level(self) -> Optional[ConsistencyLevel]: """The consistency level of this collection object.""" - ... \ No newline at end of file + ... From 0539e98ffdf66c109db0312ebde21ba2d2fe5858 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Fri, 30 Aug 2024 12:52:18 +0100 Subject: [PATCH 09/16] Release `asyncio.Lock`s in the event-loop's context --- weaviate/collections/batch/base.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index 06c54b066..1e2598f5f 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -237,10 +237,15 @@ def __init__( self.__bg_thread_exception: Optional[Exception] = None async def __make_asyncio_locks(self) -> None: + """Create the locks in the context of the running event loop so that internal `asyncio.get_event_loop()` calls work.""" self.__active_requests_lock = asyncio.Lock() self.__uuid_lookup_lock = asyncio.Lock() self.__results_lock = asyncio.Lock() + async def __release_asyncio_lock(self, lock: asyncio.Lock) -> None: + """Release the lock in the context of the running event loop so that internal `asyncio.get_event_loop()` calls work.""" + return lock.release() + @property def number_errors(self) -> int: """Return the number of errors in the batch.""" @@ -299,15 +304,13 @@ def __batch_send(self) -> None: self._batch_send = True self.__loop.run_until_complete(self.__active_requests_lock.acquire) self.__active_requests += 1 - self.__active_requests_lock.release() + self.__loop.run_until_complete(self.__release_asyncio_lock, self.__active_requests_lock) objs = self.__batch_objects.pop_items(self.__recommended_num_objects) - self.__loop.run_until_complete(self.__uuid_lookup_lock.acquire) refs = self.__batch_references.pop_items( self.__recommended_num_refs, uuid_lookup=self.__uuid_lookup, ) - self.__uuid_lookup_lock.release() # do not block the thread - the results are written to a central (locked) list and we want to have multiple concurrent batch-requests self.__loop.schedule( self.__send_batch, From c1daea8d5c88daa911b1120c12206b0d3d70aa50 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Fri, 30 Aug 2024 12:55:35 +0100 Subject: [PATCH 10/16] Fix linter --- weaviate/collections/batch/base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index 1e2598f5f..535919919 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -304,7 +304,9 @@ def __batch_send(self) -> None: self._batch_send = True self.__loop.run_until_complete(self.__active_requests_lock.acquire) self.__active_requests += 1 - self.__loop.run_until_complete(self.__release_asyncio_lock, self.__active_requests_lock) + self.__loop.run_until_complete( + self.__release_asyncio_lock, self.__active_requests_lock + ) objs = self.__batch_objects.pop_items(self.__recommended_num_objects) refs = self.__batch_references.pop_items( From 86f943140692ec6b3d668cc7cbe75f5ec207d4c4 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Mon, 2 Sep 2024 23:15:54 +0200 Subject: [PATCH 11/16] Log errors in __send_batch --- weaviate/collections/batch/base.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index 535919919..c57fb43d1 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -472,10 +472,23 @@ async def __send_batch( response_obj = await self.__batch_grpc.objects( objects=objs, timeout=DEFAULT_REQUEST_TIMEOUT ) + if response_obj.has_errors: + logger.error( + { + "message": f"Failed to send {len(response_obj.errors)} in a batch of {len(objs)}", + "errors": {err.message for err in response_obj.errors.values()}, + } + ) except Exception as e: errors_obj = { idx: ErrorObject(message=repr(e), object_=obj) for idx, obj in enumerate(objs) } + logger.error( + { + "message": f"Failed to send all objects in a batch of {len(objs)}", + "error": repr(e), + } + ) response_obj = BatchObjectReturn( _all_responses=list(errors_obj.values()), elapsed_seconds=time.time() - start, From 88665036b5b76eee485bc41fdeb05564e3620947 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Mon, 16 Sep 2024 17:31:22 +0100 Subject: [PATCH 12/16] Change `ErrorX` classes to refer to `BatchX` instead of internal `_BatchX` --- weaviate/collections/batch/base.py | 11 ++++-- .../collections/batch/grpc_batch_objects.py | 5 ++- weaviate/collections/batch/rest.py | 3 +- weaviate/collections/classes/batch.py | 39 +++++++++++++++++-- 4 files changed, 50 insertions(+), 8 deletions(-) diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index c57fb43d1..f8f3c597f 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -481,7 +481,8 @@ async def __send_batch( ) except Exception as e: errors_obj = { - idx: ErrorObject(message=repr(e), object_=obj) for idx, obj in enumerate(objs) + idx: ErrorObject(message=repr(e), object_=BatchObject._from_internal(obj)) + for idx, obj in enumerate(objs) } logger.error( { @@ -535,7 +536,9 @@ async def __send_batch( ) readd_objects = [ - err.object_ for i, err in response_obj.errors.items() if i in readded_objects + err.object_._to_internal() + for i, err in response_obj.errors.items() + if i in readded_objects ] readded_uuids = {obj.uuid for obj in readd_objects} @@ -599,7 +602,9 @@ async def __send_batch( response_ref = await self.__batch_rest.references(references=refs) except Exception as e: errors_ref = { - idx: ErrorReference(message=repr(e), reference=ref) + idx: ErrorReference( + message=repr(e), reference=BatchReference._from_internal(ref) + ) for idx, ref in enumerate(refs) } response_ref = BatchReferenceReturn( diff --git a/weaviate/collections/batch/grpc_batch_objects.py b/weaviate/collections/batch/grpc_batch_objects.py index 84d7fddaa..0cbbe4d4b 100644 --- a/weaviate/collections/batch/grpc_batch_objects.py +++ b/weaviate/collections/batch/grpc_batch_objects.py @@ -10,6 +10,7 @@ from weaviate.collections.classes.batch import ( ErrorObject, _BatchObject, + BatchObject, BatchObjectReturn, ) from weaviate.collections.classes.config import ConsistencyLevel @@ -116,7 +117,9 @@ async def objects( for idx, weav_obj in enumerate(weaviate_objs): obj = objects[idx] if idx in errors: - error = ErrorObject(errors[idx], obj, original_uuid=obj.uuid) + error = ErrorObject( + errors[idx], BatchObject._from_internal(obj), original_uuid=obj.uuid + ) return_errors[obj.index] = error all_responses[idx] = error else: diff --git a/weaviate/collections/batch/rest.py b/weaviate/collections/batch/rest.py index 1f69efbd7..051bc0db7 100644 --- a/weaviate/collections/batch/rest.py +++ b/weaviate/collections/batch/rest.py @@ -2,6 +2,7 @@ from weaviate.collections.classes.batch import ( ErrorReference, + BatchReference, _BatchReference, BatchReferenceReturn, ) @@ -45,7 +46,7 @@ async def references(self, references: List[_BatchReference]) -> BatchReferenceR errors = { idx: ErrorReference( message=entry["result"]["errors"]["error"][0]["message"], - reference=references[idx], + reference=BatchReference._from_internal(references[idx]), ) for idx, entry in enumerate(payload) if entry["result"]["status"] == "FAILED" diff --git a/weaviate/collections/classes/batch.py b/weaviate/collections/classes/batch.py index cc5e361f2..7c6468843 100644 --- a/weaviate/collections/classes/batch.py +++ b/weaviate/collections/classes/batch.py @@ -31,7 +31,7 @@ class _BatchReference: to: str tenant: Optional[str] from_uuid: str - to_uuid: Optional[str] = None + to_uuid: str | None class BatchObject(BaseModel): @@ -49,6 +49,7 @@ class BatchObject(BaseModel): vector: Optional[VECTORS] = Field(default=None) tenant: Optional[str] = Field(default=None) index: int + retry_count: int def __init__(self, **data: Any) -> None: v = data.get("vector") @@ -76,6 +77,19 @@ def _to_internal(self) -> _BatchObject: index=self.index, ) + @classmethod + def _from_internal(cls, obj: _BatchObject) -> "BatchObject": + return BatchObject( + collection=obj.collection, + vector=obj.vector, + uuid=uuid_package.UUID(obj.uuid), + properties=obj.properties, + tenant=obj.tenant, + references=obj.references, + index=obj.index, + retry_count=obj.retry_count, + ) + @field_validator("collection") def _validate_collection(cls, v: str) -> str: return _capitalize_first_letter(v) @@ -136,13 +150,32 @@ def _to_internal(self) -> _BatchReference: tenant=self.tenant, ) + @classmethod + def _from_internal(cls, ref: _BatchReference) -> "BatchReference": + to = ref.to.split("weaviate://")[1].split("/") + if len(to) == 2: + to_object_collection = to[0] + elif len(to) == 1: + to_object_collection = None + else: + raise ValueError(f"Invalid reference 'to' value in _BatchReference object {ref}") + assert ref.to_uuid is not None, "`to_uuid` must not be None" + return BatchReference( + from_object_collection=ref.from_.split("/")[1], + from_object_uuid=ref.from_uuid, + from_property_name=ref.from_.split("/")[-1], + to_object_uuid=ref.to_uuid, + to_object_collection=to_object_collection, + tenant=ref.tenant, + ) + @dataclass class ErrorObject: """This class contains the error information for a single object in a batch operation.""" message: str - object_: _BatchObject + object_: BatchObject original_uuid: Optional[UUID] = None @@ -151,7 +184,7 @@ class ErrorReference: """This class contains the error information for a single reference in a batch operation.""" message: str - reference: _BatchReference + reference: BatchReference @dataclass From da9603a7a912da8b32f69454e47ddc0649fccc3a Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Mon, 16 Sep 2024 17:35:01 +0100 Subject: [PATCH 13/16] Fix wrong <3.10 syntax --- weaviate/collections/classes/batch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaviate/collections/classes/batch.py b/weaviate/collections/classes/batch.py index 7c6468843..46fe2ecbd 100644 --- a/weaviate/collections/classes/batch.py +++ b/weaviate/collections/classes/batch.py @@ -31,7 +31,7 @@ class _BatchReference: to: str tenant: Optional[str] from_uuid: str - to_uuid: str | None + to_uuid: Union[str, None] class BatchObject(BaseModel): From 3eafa7772b5279ca67630dc9557102d2e6654c22 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Mon, 16 Sep 2024 17:44:00 +0100 Subject: [PATCH 14/16] Fix missing default of retry_count in BatchObject --- weaviate/collections/classes/batch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaviate/collections/classes/batch.py b/weaviate/collections/classes/batch.py index 46fe2ecbd..e6aeb9582 100644 --- a/weaviate/collections/classes/batch.py +++ b/weaviate/collections/classes/batch.py @@ -49,7 +49,7 @@ class BatchObject(BaseModel): vector: Optional[VECTORS] = Field(default=None) tenant: Optional[str] = Field(default=None) index: int - retry_count: int + retry_count: int = 0 def __init__(self, **data: Any) -> None: v = data.get("vector") From 7a98f557dccbaa02685501eda095a505a88dff5c Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Mon, 16 Sep 2024 18:05:29 +0100 Subject: [PATCH 15/16] Fix parsing of `XReference` --- weaviate/collections/classes/batch.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/weaviate/collections/classes/batch.py b/weaviate/collections/classes/batch.py index e6aeb9582..8f91518b0 100644 --- a/weaviate/collections/classes/batch.py +++ b/weaviate/collections/classes/batch.py @@ -152,19 +152,19 @@ def _to_internal(self) -> _BatchReference: @classmethod def _from_internal(cls, ref: _BatchReference) -> "BatchReference": + from_ = ref.from_.split("weaviate://")[1].split("/") to = ref.to.split("weaviate://")[1].split("/") if len(to) == 2: - to_object_collection = to[0] + to_object_collection = to[1] elif len(to) == 1: to_object_collection = None else: raise ValueError(f"Invalid reference 'to' value in _BatchReference object {ref}") - assert ref.to_uuid is not None, "`to_uuid` must not be None" return BatchReference( - from_object_collection=ref.from_.split("/")[1], + from_object_collection=from_[1], from_object_uuid=ref.from_uuid, - from_property_name=ref.from_.split("/")[-1], - to_object_uuid=ref.to_uuid, + from_property_name=ref.from_[-1], + to_object_uuid=ref.to_uuid if ref.to_uuid is not None else uuid_package.UUID(to[-1]), to_object_collection=to_object_collection, tenant=ref.tenant, ) From feb148a1ea63c520e5af5221f7f5570113077075 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Fri, 1 Nov 2024 11:56:42 +0000 Subject: [PATCH 16/16] Move futures inside executor to avoid races --- .gitignore | 3 ++- integration/test_batch_v4.py | 4 ++-- profiling/test_sphere.py | 6 +++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index ece1f9e97..c2979fbe7 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,5 @@ image.png scratch/ *-test.sh -*.hdf5 \ No newline at end of file +*.hdf5 +*.jsonl \ No newline at end of file diff --git a/integration/test_batch_v4.py b/integration/test_batch_v4.py index 79bef8fba..2c9726f2f 100644 --- a/integration/test_batch_v4.py +++ b/integration/test_batch_v4.py @@ -596,8 +596,8 @@ def batch_insert(batch: BatchClient) -> None: with concurrent.futures.ThreadPoolExecutor() as executor: with client.batch.dynamic() as batch: futures = [executor.submit(batch_insert, batch) for _ in range(nr_threads)] - for future in concurrent.futures.as_completed(futures): - future.result() + for future in concurrent.futures.as_completed(futures): + future.result() objs = client.collections.get(name).query.fetch_objects(limit=nr_objects * nr_threads).objects assert len(objs) == nr_objects * nr_threads diff --git a/profiling/test_sphere.py b/profiling/test_sphere.py index d49b2a7d3..a0614a498 100644 --- a/profiling/test_sphere.py +++ b/profiling/test_sphere.py @@ -9,7 +9,7 @@ def test_sphere(collection_factory: CollectionFactory) -> None: - sphere_file = get_file_path("sphere.100k.jsonl") + sphere_file = get_file_path("sphere.1m.jsonl") collection = collection_factory( properties=[ @@ -26,7 +26,7 @@ def test_sphere(collection_factory: CollectionFactory) -> None: ) start = time.time() - import_objects = 50000 + import_objects = 1000000 with collection.batch.dynamic() as batch: with open(sphere_file) as jsonl_file: for i, jsonl in enumerate(jsonl_file): @@ -45,7 +45,7 @@ def test_sphere(collection_factory: CollectionFactory) -> None: vector=json_parsed["vector"], ) if i % 1000 == 0: - print(f"Imported {i} objects") + print(f"Imported {len(collection)} objects") assert len(collection.batch.failed_objects) == 0 assert len(collection) == import_objects print(f"Imported {import_objects} objects in {time.time() - start}")