Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace threading.Lock with asyncio.Lock when batching to avoid deadlocks #1270

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
f16f366
Fix deadlocks in the batching algo by replacing `threading.Lock` with…
tsmith023 Aug 29, 2024
4e8863f
Remove lower bound requirement on fixed size batching
tsmith023 Aug 29, 2024
057b3dc
change time.sleep to asyncio.sleep in async func
tsmith023 Aug 29, 2024
3150959
Ensure `asyncio.Lock`s are opened in the event loop thread to handle …
tsmith023 Aug 29, 2024
d45621f
Remove stifling locks in _add_objects
tsmith023 Aug 29, 2024
336f8ee
Merge branch 'fix-deadlocks-in-batching' of https://github.com/weavia…
tsmith023 Aug 29, 2024
7cdcfc0
Move objs and refs inits back to __init__
tsmith023 Aug 29, 2024
d4a3a9f
Add missing props in collection/base.pyi stubs
tsmith023 Aug 29, 2024
c7bc642
Fix formatting
tsmith023 Aug 29, 2024
0539e98
Release `asyncio.Lock`s in the event-loop's context
tsmith023 Aug 30, 2024
c1daea8
Fix linter
tsmith023 Aug 30, 2024
86f9431
Log errors in __send_batch
tsmith023 Sep 2, 2024
8866503
Change `ErrorX` classes to refer to `BatchX` instead of internal `_Ba…
tsmith023 Sep 16, 2024
176984e
Merge branch 'main' of https://github.com/weaviate/weaviate-python-cl…
tsmith023 Sep 16, 2024
da9603a
Fix wrong <3.10 syntax
tsmith023 Sep 16, 2024
3eafa77
Fix missing default of retry_count in BatchObject
tsmith023 Sep 16, 2024
7a98f55
Fix parsing of `XReference`
tsmith023 Sep 16, 2024
ee17a4e
Merge branch 'main' of https://github.com/weaviate/weaviate-python-cl…
tsmith023 Nov 1, 2024
feb148a
Move futures inside executor to avoid races
tsmith023 Nov 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions integration/test_batch_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,9 +687,13 @@ def test_batching_error_logs(
for obj in [{"name": i} for i in range(100)]:
batch.add_object(properties=obj, collection=name)
assert (
"Failed to send 100 objects in a batch of 100. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects."
in caplog.text
)
("Failed to send" in caplog.text)
and ("objects in a batch of" in caplog.text)
and (
"Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects."
in caplog.text
)
) # number of objects sent per batch is not fixed for less than 100 objects


def test_references_with_to_uuids(client_factory: ClientFactory) -> None:
Expand Down
59 changes: 34 additions & 25 deletions weaviate/collections/batch/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import math
import threading
import time
Expand Down Expand Up @@ -60,39 +61,39 @@ class BatchRequest(ABC, Generic[TBatchInput, TBatchReturn]):

def __init__(self) -> None:
self._items: List[TBatchInput] = []
self._lock = threading.Lock()
self._lock = asyncio.Lock()

def __len__(self) -> int:
return len(self._items)

def add(self, item: TBatchInput) -> None:
async def add(self, item: TBatchInput) -> None:
"""Add an item to the BatchRequest."""
self._lock.acquire()
await self._lock.acquire()
self._items.append(item)
self._lock.release()

def prepend(self, item: List[TBatchInput]) -> None:
async def prepend(self, item: List[TBatchInput]) -> None:
"""Add items to the front of the BatchRequest.

This is intended to be used when objects should be retries, eg. after a temporary error.
"""
self._lock.acquire()
await self._lock.acquire()
self._items = item + self._items
self._lock.release()


class ReferencesBatchRequest(BatchRequest[_BatchReference, BatchReferenceReturn]):
"""Collect Weaviate-object references to add them in one request to Weaviate."""

def pop_items(self, pop_amount: int, uuid_lookup: Set[str]) -> List[_BatchReference]:
async def pop_items(self, pop_amount: int, uuid_lookup: Set[str]) -> List[_BatchReference]:
"""Pop the given number of items from the BatchRequest queue.

Returns
`List[_BatchReference]` items from the BatchRequest.
"""
ret: List[_BatchReference] = []
i = 0
self._lock.acquire()
await self._lock.acquire()
while len(ret) < pop_amount and len(self._items) > 0 and i < len(self._items):
if self._items[i].from_uuid not in uuid_lookup and (
self._items[i].to_uuid is None or self._items[i].to_uuid not in uuid_lookup
Expand All @@ -107,13 +108,13 @@ def pop_items(self, pop_amount: int, uuid_lookup: Set[str]) -> List[_BatchRefere
class ObjectsBatchRequest(BatchRequest[_BatchObject, BatchObjectReturn]):
"""Collect objects for one batch request to weaviate."""

def pop_items(self, pop_amount: int) -> List[_BatchObject]:
async def pop_items(self, pop_amount: int) -> List[_BatchObject]:
"""Pop the given number of items from the BatchRequest queue.

Returns
`List[_BatchObject]` items from the BatchRequest.
"""
self._lock.acquire()
await self._lock.acquire()
if pop_amount >= len(self._items):
ret = copy(self._items)
self._items.clear()
Expand Down Expand Up @@ -174,14 +175,14 @@ def __init__(
self.__batch_rest = _BatchREST(connection, self.__consistency_level)

# lookup table for objects that are currently being processed - is used to not send references from objects that have not been added yet
self.__uuid_lookup_lock = threading.Lock()
self.__uuid_lookup_lock = asyncio.Lock()
self.__uuid_lookup: Set[str] = set()

# we do not want that users can access the results directly as they are not thread-safe
self.__results_for_wrapper_backup = results
self.__results_for_wrapper = _BatchDataWrapper()

self.__results_lock = threading.Lock()
self.__results_lock = asyncio.Lock()

self.__cluster = _ClusterBatch(self.__connection)

Expand Down Expand Up @@ -221,7 +222,7 @@ def __init__(
self.__recommended_num_refs: int = 50

self.__active_requests = 0
self.__active_requests_lock = threading.Lock()
self.__active_requests_lock = asyncio.Lock()

# dynamic batching
self.__time_last_scale_up: float = 0
Expand Down Expand Up @@ -292,14 +293,18 @@ def __batch_send(self) -> None:
self.__time_stamp_last_request = time.time()

self._batch_send = True
self.__active_requests_lock.acquire()
self.__loop.run_until_complete(self.__active_requests_lock.acquire)
self.__active_requests += 1
self.__active_requests_lock.release()

objs = self.__batch_objects.pop_items(self.__recommended_num_objects)
self.__uuid_lookup_lock.acquire()
refs = self.__batch_references.pop_items(
self.__recommended_num_refs, uuid_lookup=self.__uuid_lookup
objs = self.__loop.run_until_complete(
self.__batch_objects.pop_items, self.__recommended_num_objects
)
self.__loop.run_until_complete(self.__uuid_lookup_lock.acquire)
refs = self.__loop.run_until_complete(
self.__batch_references.pop_items,
self.__recommended_num_refs,
uuid_lookup=self.__uuid_lookup,
)
self.__uuid_lookup_lock.release()
# do not block the thread - the results are written to a central (locked) list and we want to have multiple concurrent batch-requests
Expand Down Expand Up @@ -513,7 +518,7 @@ async def __send_batch(
]
readded_uuids = {obj.uuid for obj in readd_objects}

self.__batch_objects.prepend(readd_objects)
await self.__batch_objects.prepend(readd_objects)

new_errors = {
i: err for i, err in response_obj.errors.items() if i not in readded_objects
Expand Down Expand Up @@ -542,7 +547,7 @@ async def __send_batch(
else:
# sleep a bit to recover from the rate limit in other cases
time.sleep(2**highest_retry_count)
self.__uuid_lookup_lock.acquire()
await self.__uuid_lookup_lock.acquire()
self.__uuid_lookup.difference_update(
obj.uuid for obj in objs if obj.uuid not in readded_uuids
)
Expand All @@ -561,7 +566,7 @@ async def __send_batch(
"message": "There have been more than 30 failed object batches. Further errors will not be logged.",
}
)
self.__results_lock.acquire()
await self.__results_lock.acquire()
self.__results_for_wrapper.results.objs += response_obj
self.__results_for_wrapper.failed_objects.extend(response_obj.errors.values())
self.__results_lock.release()
Expand Down Expand Up @@ -595,12 +600,12 @@ async def __send_batch(
"message": "There have been more than 30 failed reference batches. Further errors will not be logged.",
}
)
self.__results_lock.acquire()
await self.__results_lock.acquire()
self.__results_for_wrapper.results.refs += response_ref
self.__results_for_wrapper.failed_references.extend(response_ref.errors.values())
self.__results_lock.release()

self.__active_requests_lock.acquire()
await self.__active_requests_lock.acquire()
self.__active_requests -= 1
self.__active_requests_lock.release()

Expand Down Expand Up @@ -636,15 +641,17 @@ def _add_object(
index=self.__objs_count,
)
self.__objs_count += 1
self.__loop.run_until_complete(self.__results_lock.acquire)
self.__results_for_wrapper.imported_shards.add(
Shard(collection=collection, tenant=tenant)
)
self.__results_lock.release()
except ValidationError as e:
raise WeaviateBatchValidationError(repr(e))
self.__uuid_lookup_lock.acquire()
self.__loop.run_until_complete(self.__uuid_lookup_lock.acquire)
self.__uuid_lookup.add(str(batch_object.uuid))
self.__uuid_lookup_lock.release()
self.__batch_objects.add(batch_object._to_internal())
self.__loop.run_until_complete(self.__batch_objects.add, batch_object._to_internal())

# block if queue gets too long or weaviate is overloaded - reading files is faster them sending them so we do
# not need a long queue
Expand Down Expand Up @@ -688,7 +695,9 @@ def _add_reference(
)
except ValidationError as e:
raise WeaviateBatchValidationError(repr(e))
self.__batch_references.add(batch_reference._to_internal())
self.__loop.run_until_complete(
self.__batch_references.add, batch_reference._to_internal()
)

# block if queue gets too long or weaviate is overloaded
while self.__recommended_num_objects == 0:
Expand Down
Loading