Skip to content

Commit

Permalink
Merge pull request #25 from ExpediaGroup/msudhir/add_print_for_debug
Browse files Browse the repository at this point in the history
Adding print statements temporarily to see if logs show up for materialization
  • Loading branch information
piket authored Aug 30, 2023
2 parents 53dc637 + 7eba517 commit 17d019b
Showing 1 changed file with 15 additions and 0 deletions.
15 changes: 15 additions & 0 deletions sdk/python/feast/expediagroup/vectordb/milvus_online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ def __enter__(self):
logger.info(
f"Connecting to Milvus with alias {self.online_config.alias} and host {self.online_config.host} and port {self.online_config.port}."
)
print(
f"Connecting to Milvus with alias {self.online_config.alias} and host {self.online_config.host} and port {self.online_config.port}."
)
connections.connect(
alias=self.online_config.alias,
host=self.online_config.host,
Expand All @@ -95,10 +98,12 @@ def __enter__(self):
def __exit__(self, exc_type, exc_value, traceback):
# Disconnecting from Milvus
logger.info("Closing the connection to Milvus")
print("Closing the connection to Milvus")
connections.disconnect(self.online_config.alias)
logger.info("Connection Closed")
if exc_type is not None:
logger.error(f"An exception of type {exc_type} occurred: {exc_value}")
print(f"An exception of type {exc_type} occurred: {exc_value}")


class MilvusOnlineStore(OnlineStore):
Expand All @@ -112,15 +117,18 @@ def online_write_batch(
progress: Optional[Callable[[int], Any]],
) -> None:
with MilvusConnectionManager(config.online_store):
print("Starting the process to batch write data into Milvus.")
collection_to_load_data = Collection(table.name)
rows = self._format_data_for_milvus(data, collection_to_load_data)
collection_to_load_data.insert(rows)
# The flush call will seal any remaining segments and send them for indexing
collection_to_load_data.flush()
collection_to_load_data.load()
logger.info("loading data into memory")
print("loading data into memory")
utility.wait_for_loading_complete(table.name)
logger.info("loading data into memory complete")
print("loading data into memory complete")

def online_read(
self,
Expand Down Expand Up @@ -160,6 +168,7 @@ def update(

if collection_available:
logger.info(f"Collection {table_to_keep.name} already exists.")
print(f"Collection {table_to_keep.name} already exists.")
else:
if not table_to_keep.schema:
raise ValueError(
Expand All @@ -176,6 +185,9 @@ def update(
logger.info(
f"creating collection {table_to_keep.name} with schema: {schema}"
)
print(
f"creating collection {table_to_keep.name} with schema: {schema}"
)
collection = Collection(name=table_to_keep.name, schema=schema)

for field_name, index_params in indexes.items():
Expand All @@ -184,6 +196,9 @@ def update(
logger.info(
f"Collection {table_to_keep.name} has been created successfully."
)
print(
f"Collection {table_to_keep.name} has been created successfully."
)

for table_to_delete in tables_to_delete:
collection_available = utility.has_collection(table_to_delete.name)
Expand Down

0 comments on commit 17d019b

Please sign in to comment.