Skip to content

Commit

Permalink
Merge pull request #1141 from weaviate/feat/introduce-logger-of-batch…
Browse files Browse the repository at this point in the history
…-errors

Introduce logging of partial batch errors
  • Loading branch information
tsmith023 authored Jul 1, 2024
2 parents 8de3690 + d0ff9a6 commit 847d6c4
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 38 deletions.
2 changes: 1 addition & 1 deletion test/test_embedded.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_embedded_end_to_end(options: EmbeddedDB, tmp_path):

embedded_db.ensure_running()
assert embedded_db.is_listening() is True
with patch("builtins.print") as mocked_print:
with patch("weaviate.logger.logger.info") as mocked_print:
embedded_db.start()
mocked_print.assert_called_once_with(
f"embedded weaviate is already listening on port {options.port}"
Expand Down
41 changes: 38 additions & 3 deletions weaviate/collections/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from weaviate.collections.classes.types import WeaviateProperties
from weaviate.connect import ConnectionV4
from weaviate.exceptions import WeaviateBatchValidationError
from weaviate.logger import logger
from weaviate.types import UUID, VECTORS
from weaviate.warnings import _Warnings

Expand Down Expand Up @@ -191,6 +192,8 @@ def __init__(
self.__max_batch_size: int = 1000

self.__objs_count = 0
self.__objs_logs_count = 0
self.__refs_logs_count = 0

if isinstance(self.__batching_mode, _FixedSizeBatching):
self.__recommended_num_objects = self.__batching_mode.batch_size
Expand Down Expand Up @@ -491,13 +494,19 @@ def __dynamic_batching(self) -> None:
async def __send_batch_async(
self, objs: List[_BatchObject], refs: List[_BatchReference], readd_rate_limit: bool
) -> None:
if len(objs) > 0:
if (n_objs := len(objs)) > 0:
start = time.time()
try:
response_obj = await self.__batch_grpc.aobjects(
objects=objs, timeout=DEFAULT_REQUEST_TIMEOUT
)
except Exception as e:
logger.warn(
{
"message": "Failed to insert objects in batch. Inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.",
"error": repr(e),
}
)
errors_obj = {
idx: ErrorObject(message=repr(e), object_=obj) for idx, obj in enumerate(objs)
}
Expand Down Expand Up @@ -586,17 +595,29 @@ async def __send_batch_async(
)
self.__uuid_lookup_lock.release()

if (n_obj_errs := len(response_obj.errors)) > 0 and n_obj_errs < 30:
logger.error(
{
"message": f"Failed to send {n_obj_errs} objects in a batch of {n_objs}. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.",
}
)
self.__objs_logs_count += 1
if self.__objs_logs_count > 30:
logger.error(
{
"message": "There have been more than 30 failed object batches. Further errors will not be logged.",
}
)
self.__results_lock.acquire()
self.__results_for_wrapper.results.objs += response_obj
self.__results_for_wrapper.failed_objects.extend(response_obj.errors.values())
self.__results_lock.release()
self.__took_queue.append(time.time() - start)

if len(refs) > 0:
if (n_refs := len(refs)) > 0:
start = time.time()
try:
response_ref = await self.__batch_rest.references(references=refs)

except Exception as e:
errors_ref = {
idx: ErrorReference(message=repr(e), reference=ref)
Expand All @@ -607,6 +628,20 @@ async def __send_batch_async(
errors=errors_ref,
has_errors=True,
)
if (n_ref_errs := len(response_ref.errors)) > 0 and n_ref_errs < 30:
logger.error(
{
"message": f"Failed to send {n_ref_errs} references in a batch of {n_refs}. Please inspect client.batch.failed_references or collection.batch.failed_references for the failed references.",
"errors": response_ref.errors,
}
)
self.__refs_logs_count += 1
if self.__refs_logs_count > 30:
logger.error(
{
"message": "There have been more than 30 failed reference batches. Further errors will not be logged.",
}
)
self.__results_lock.acquire()
self.__results_for_wrapper.results.refs += response_ref
self.__results_for_wrapper.failed_references.extend(response_ref.errors.values())
Expand Down
7 changes: 4 additions & 3 deletions weaviate/collections/batch/batch_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from weaviate.collections.classes.batch import BatchResult, ErrorObject, ErrorReference, Shard
from weaviate.collections.classes.config import ConsistencyLevel
from weaviate.connect import ConnectionV4
from weaviate.logger import logger
from weaviate.util import _capitalize_first_letter, _decode_json_response_list


Expand Down Expand Up @@ -51,7 +52,7 @@ def is_ready(how_many: int) -> bool:
for shard in shards or self._batch_data.imported_shards
)
except Exception as e:
print(
logger.warn(
f"Error while getting class shards statuses: {e}, trying again with 2**n={2**how_many}s exponential backoff with n={how_many}"
)
if how_many_failures == how_many:
Expand All @@ -62,10 +63,10 @@ def is_ready(how_many: int) -> bool:
count = 0
while not is_ready(0):
if count % 20 == 0: # print every 5s
print("Waiting for async indexing to finish...")
logger.debug("Waiting for async indexing to finish...")
time.sleep(0.25)
count += 1
print("Async indexing finished!")
logger.debug("Async indexing finished!")

def _get_shards_readiness(self, shard: Shard) -> List[bool]:
path = f"/schema/{_capitalize_first_letter(shard.collection)}/shards{'' if shard.tenant is None else f'?tenant={shard.tenant}'}"
Expand Down
59 changes: 33 additions & 26 deletions weaviate/collections/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from weaviate.connect import ConnectionV4
from weaviate.connect.v4 import _ExpectedStatusCodes
from weaviate.exceptions import WeaviateInvalidInputError
from weaviate.logger import logger
from weaviate.types import BEACON, UUID, VECTORS
from weaviate.util import (
_datetime_to_string,
Expand Down Expand Up @@ -407,33 +408,39 @@ def insert_many(
`weaviate.exceptions.WeaviateInsertManyAllFailedError`:
If every object in the batch fails to be inserted. The exception message contains details about the failure.
"""
return self._batch_grpc.objects(
[
(
_BatchObject(
collection=self.name,
vector=obj.vector,
uuid=str(obj.uuid if obj.uuid is not None else uuid_package.uuid4()),
properties=cast(dict, obj.properties),
tenant=self._tenant,
references=obj.references,
index=idx,
)
if isinstance(obj, DataObject)
else _BatchObject(
collection=self.name,
vector=None,
uuid=str(uuid_package.uuid4()),
properties=cast(dict, obj),
tenant=self._tenant,
references=None,
index=idx,
)
objs = [
(
_BatchObject(
collection=self.name,
vector=obj.vector,
uuid=str(obj.uuid if obj.uuid is not None else uuid_package.uuid4()),
properties=cast(dict, obj.properties),
tenant=self._tenant,
references=obj.references,
index=idx,
)
for idx, obj in enumerate(objects)
],
timeout=self._connection.timeout_config.insert,
)
if isinstance(obj, DataObject)
else _BatchObject(
collection=self.name,
vector=None,
uuid=str(uuid_package.uuid4()),
properties=cast(dict, obj),
tenant=self._tenant,
references=None,
index=idx,
)
)
for idx, obj in enumerate(objects)
]
res = self._batch_grpc.objects(objs, timeout=self._connection.timeout_config.insert)
if (n_obj_errs := len(res.errors)) > 0:
logger.error(
{
"message": f"Failed to send {n_obj_errs} objects in a batch of {len(objs)}. Please inspect the errors variable of the returned object for more information.",
"errors": res.errors,
}
)
return res

def replace(
self,
Expand Down
11 changes: 6 additions & 5 deletions weaviate/embedded.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from weaviate import exceptions
from weaviate.exceptions import WeaviateStartUpError
from weaviate.logger import logger
from weaviate.util import _decode_json_response_dict

DEFAULT_BINARY_PATH = str(Path.home() / ".cache/weaviate-embedded/")
Expand Down Expand Up @@ -136,7 +137,7 @@ def ensure_weaviate_binary_exists(self) -> None:
+ str(hashlib.sha256(self.options.version.encode("utf-8")).hexdigest()),
)
if not self._weaviate_binary_path.exists():
print(
logger.info(
f"Binary {self.options.binary_path} did not exist. Downloading binary from {self._download_url}"
)
if self._download_url.endswith(".tar.gz"):
Expand Down Expand Up @@ -185,15 +186,15 @@ def stop(self) -> None:
self.process.terminate()
self.process.wait()
except ProcessLookupError:
print(
logger.info(
f"""Tried to stop embedded weaviate process {self.process.pid}. Process was not found. So not doing
anything"""
)
self.process = None

def ensure_running(self) -> None:
if self.is_listening() is False:
print(
logger.info(
f"Embedded weaviate wasn't listening on ports http:{self.options.port} & grpc:{self.options.grpc_port}, so starting embedded weaviate again"
)
self.start()
Expand Down Expand Up @@ -243,7 +244,7 @@ def start(self) -> None:
env=my_env,
)
self.process = process
print(f"Started {self.options.binary_path}: process ID {self.process.pid}")
logger.info(f"Started {self.options.binary_path}: process ID {self.process.pid}")
self.wait_till_listening()

@abstractmethod
Expand All @@ -264,7 +265,7 @@ def is_listening(self) -> bool:

def start(self) -> None:
if self.is_listening():
print(f"embedded weaviate is already listening on port {self.options.port}")
logger.info(f"embedded weaviate is already listening on port {self.options.port}")
return
super().start()

Expand Down
5 changes: 5 additions & 0 deletions weaviate/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import os
from logging import getLogger

logger = getLogger("weaviate-client")
logger.setLevel(os.getenv("WEAVIATE_LOG_LEVEL", "INFO"))

0 comments on commit 847d6c4

Please sign in to comment.