diff --git a/test/test_embedded.py b/test/test_embedded.py index bcec54cf9..bd0dbe13e 100644 --- a/test/test_embedded.py +++ b/test/test_embedded.py @@ -118,7 +118,7 @@ def test_embedded_end_to_end(options: EmbeddedDB, tmp_path): embedded_db.ensure_running() assert embedded_db.is_listening() is True - with patch("builtins.print") as mocked_print: + with patch("weaviate.logger.logger.info") as mocked_print: embedded_db.start() mocked_print.assert_called_once_with( f"embedded weaviate is already listening on port {options.port}" diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index 379b40c5d..51ffaaecc 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -45,6 +45,7 @@ from weaviate.collections.classes.types import WeaviateProperties from weaviate.connect import ConnectionV4 from weaviate.exceptions import WeaviateBatchValidationError +from weaviate.logger import logger from weaviate.types import UUID, VECTORS from weaviate.warnings import _Warnings @@ -191,6 +192,8 @@ def __init__( self.__max_batch_size: int = 1000 self.__objs_count = 0 + self.__objs_logs_count = 0 + self.__refs_logs_count = 0 if isinstance(self.__batching_mode, _FixedSizeBatching): self.__recommended_num_objects = self.__batching_mode.batch_size @@ -491,13 +494,19 @@ def __dynamic_batching(self) -> None: async def __send_batch_async( self, objs: List[_BatchObject], refs: List[_BatchReference], readd_rate_limit: bool ) -> None: - if len(objs) > 0: + if (n_objs := len(objs)) > 0: start = time.time() try: response_obj = await self.__batch_grpc.aobjects( objects=objs, timeout=DEFAULT_REQUEST_TIMEOUT ) except Exception as e: + logger.warn( + { + "message": "Failed to insert objects in batch. Inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.", + "error": repr(e), + } + ) errors_obj = { idx: ErrorObject(message=repr(e), object_=obj) for idx, obj in enumerate(objs) } @@ -586,17 +595,29 @@ async def __send_batch_async( ) self.__uuid_lookup_lock.release() + if (n_obj_errs := len(response_obj.errors)) > 0 and n_obj_errs < 30: + logger.error( + { + "message": f"Failed to send {n_obj_errs} objects in a batch of {n_objs}. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.", + } + ) + self.__objs_logs_count += 1 + if self.__objs_logs_count > 30: + logger.error( + { + "message": "There have been more than 30 failed object batches. Further errors will not be logged.", + } + ) self.__results_lock.acquire() self.__results_for_wrapper.results.objs += response_obj self.__results_for_wrapper.failed_objects.extend(response_obj.errors.values()) self.__results_lock.release() self.__took_queue.append(time.time() - start) - if len(refs) > 0: + if (n_refs := len(refs)) > 0: start = time.time() try: response_ref = await self.__batch_rest.references(references=refs) - except Exception as e: errors_ref = { idx: ErrorReference(message=repr(e), reference=ref) @@ -607,6 +628,20 @@ async def __send_batch_async( errors=errors_ref, has_errors=True, ) + if (n_ref_errs := len(response_ref.errors)) > 0 and n_ref_errs < 30: + logger.error( + { + "message": f"Failed to send {n_ref_errs} references in a batch of {n_refs}. Please inspect client.batch.failed_references or collection.batch.failed_references for the failed references.", + "errors": response_ref.errors, + } + ) + self.__refs_logs_count += 1 + if self.__refs_logs_count > 30: + logger.error( + { + "message": "There have been more than 30 failed reference batches. Further errors will not be logged.", + } + ) self.__results_lock.acquire() self.__results_for_wrapper.results.refs += response_ref self.__results_for_wrapper.failed_references.extend(response_ref.errors.values()) diff --git a/weaviate/collections/batch/batch_wrapper.py b/weaviate/collections/batch/batch_wrapper.py index 1c426e575..77db2d3b8 100644 --- a/weaviate/collections/batch/batch_wrapper.py +++ b/weaviate/collections/batch/batch_wrapper.py @@ -10,6 +10,7 @@ from weaviate.collections.classes.batch import BatchResult, ErrorObject, ErrorReference, Shard from weaviate.collections.classes.config import ConsistencyLevel from weaviate.connect import ConnectionV4 +from weaviate.logger import logger from weaviate.util import _capitalize_first_letter, _decode_json_response_list @@ -51,7 +52,7 @@ def is_ready(how_many: int) -> bool: for shard in shards or self._batch_data.imported_shards ) except Exception as e: - print( + logger.warn( f"Error while getting class shards statuses: {e}, trying again with 2**n={2**how_many}s exponential backoff with n={how_many}" ) if how_many_failures == how_many: @@ -62,10 +63,10 @@ def is_ready(how_many: int) -> bool: count = 0 while not is_ready(0): if count % 20 == 0: # print every 5s - print("Waiting for async indexing to finish...") + logger.debug("Waiting for async indexing to finish...") time.sleep(0.25) count += 1 - print("Async indexing finished!") + logger.debug("Async indexing finished!") def _get_shards_readiness(self, shard: Shard) -> List[bool]: path = f"/schema/{_capitalize_first_letter(shard.collection)}/shards{'' if shard.tenant is None else f'?tenant={shard.tenant}'}" diff --git a/weaviate/collections/data.py b/weaviate/collections/data.py index d9578c512..21ae4cb2c 100644 --- a/weaviate/collections/data.py +++ b/weaviate/collections/data.py @@ -49,6 +49,7 @@ from weaviate.connect import ConnectionV4 from weaviate.connect.v4 import _ExpectedStatusCodes from weaviate.exceptions import WeaviateInvalidInputError +from weaviate.logger import logger from weaviate.types import BEACON, UUID, VECTORS from weaviate.util import ( _datetime_to_string, @@ -407,33 +408,39 @@ def insert_many( `weaviate.exceptions.WeaviateInsertManyAllFailedError`: If every object in the batch fails to be inserted. The exception message contains details about the failure. """ - return self._batch_grpc.objects( - [ - ( - _BatchObject( - collection=self.name, - vector=obj.vector, - uuid=str(obj.uuid if obj.uuid is not None else uuid_package.uuid4()), - properties=cast(dict, obj.properties), - tenant=self._tenant, - references=obj.references, - index=idx, - ) - if isinstance(obj, DataObject) - else _BatchObject( - collection=self.name, - vector=None, - uuid=str(uuid_package.uuid4()), - properties=cast(dict, obj), - tenant=self._tenant, - references=None, - index=idx, - ) + objs = [ + ( + _BatchObject( + collection=self.name, + vector=obj.vector, + uuid=str(obj.uuid if obj.uuid is not None else uuid_package.uuid4()), + properties=cast(dict, obj.properties), + tenant=self._tenant, + references=obj.references, + index=idx, ) - for idx, obj in enumerate(objects) - ], - timeout=self._connection.timeout_config.insert, - ) + if isinstance(obj, DataObject) + else _BatchObject( + collection=self.name, + vector=None, + uuid=str(uuid_package.uuid4()), + properties=cast(dict, obj), + tenant=self._tenant, + references=None, + index=idx, + ) + ) + for idx, obj in enumerate(objects) + ] + res = self._batch_grpc.objects(objs, timeout=self._connection.timeout_config.insert) + if (n_obj_errs := len(res.errors)) > 0: + logger.error( + { + "message": f"Failed to send {n_obj_errs} objects in a batch of {len(objs)}. Please inspect the errors variable of the returned object for more information.", + "errors": res.errors, + } + ) + return res def replace( self, diff --git a/weaviate/embedded.py b/weaviate/embedded.py index 130373705..d737ec182 100644 --- a/weaviate/embedded.py +++ b/weaviate/embedded.py @@ -20,6 +20,7 @@ from weaviate import exceptions from weaviate.exceptions import WeaviateStartUpError +from weaviate.logger import logger from weaviate.util import _decode_json_response_dict DEFAULT_BINARY_PATH = str(Path.home() / ".cache/weaviate-embedded/") @@ -136,7 +137,7 @@ def ensure_weaviate_binary_exists(self) -> None: + str(hashlib.sha256(self.options.version.encode("utf-8")).hexdigest()), ) if not self._weaviate_binary_path.exists(): - print( + logger.info( f"Binary {self.options.binary_path} did not exist. Downloading binary from {self._download_url}" ) if self._download_url.endswith(".tar.gz"): @@ -185,7 +186,7 @@ def stop(self) -> None: self.process.terminate() self.process.wait() except ProcessLookupError: - print( + logger.info( f"""Tried to stop embedded weaviate process {self.process.pid}. Process was not found. So not doing anything""" ) @@ -193,7 +194,7 @@ def stop(self) -> None: def ensure_running(self) -> None: if self.is_listening() is False: - print( + logger.info( f"Embedded weaviate wasn't listening on ports http:{self.options.port} & grpc:{self.options.grpc_port}, so starting embedded weaviate again" ) self.start() @@ -243,7 +244,7 @@ def start(self) -> None: env=my_env, ) self.process = process - print(f"Started {self.options.binary_path}: process ID {self.process.pid}") + logger.info(f"Started {self.options.binary_path}: process ID {self.process.pid}") self.wait_till_listening() @abstractmethod @@ -264,7 +265,7 @@ def is_listening(self) -> bool: def start(self) -> None: if self.is_listening(): - print(f"embedded weaviate is already listening on port {self.options.port}") + logger.info(f"embedded weaviate is already listening on port {self.options.port}") return super().start() diff --git a/weaviate/logger.py b/weaviate/logger.py new file mode 100644 index 000000000..755ea6f38 --- /dev/null +++ b/weaviate/logger.py @@ -0,0 +1,5 @@ +import os +from logging import getLogger + +logger = getLogger("weaviate-client") +logger.setLevel(os.getenv("WEAVIATE_LOG_LEVEL", "INFO"))