Skip to content

Commit

Permalink
Merge pull request #23 from ExpediaGroup/feature/integration_test_imp…
Browse files Browse the repository at this point in the history
…rovements

Feature/integration test improvements
  • Loading branch information
michaelbackes authored Aug 25, 2023
2 parents 004ca49 + b5067c2 commit 01867f4
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 112 deletions.
7 changes: 2 additions & 5 deletions docs/reference/online-stores/milvus.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,19 @@ An example feature view:
Field(
name="book_id",
dtype=Int64,
tags={
"is_primary": "True",
},
),
Field(
name="book_embedding",
dtype=Array(Float32),
tags={
"is_primary": "False",
"description": "book embedding of the content",
"dimensions": "2200",
"index_type": IndexType.ivf_flat.value,
"index_params": {
"nlist": 1024,
},
"metric_type": "L2",
}
),
],
source=SOURCE,
Expand Down Expand Up @@ -110,7 +107,7 @@ Below is a matrix indicating which functionality is supported by the Milvus onli
| readable by Python SDK | yes |
| readable by Java | no |
| readable by Go | no |
| support for entityless feature views | yes |
| support for entityless feature views | no |
| support for concurrent writing to the same key | yes |
| support for ttl (time to live) at retrieval | no |
| support for deleting expired data | no |
Expand Down
176 changes: 87 additions & 89 deletions sdk/python/feast/expediagroup/vectordb/milvus_online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple

import numpy as np
import pandas as pd
from bidict import bidict
from pydantic.typing import Literal
from pymilvus import (
Expand All @@ -17,7 +18,6 @@
from pymilvus.client.types import IndexType

from feast import Entity, FeatureView, RepoConfig
from feast.field import Field
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import FloatList
Expand Down Expand Up @@ -102,14 +102,15 @@ def online_write_batch(
progress: Optional[Callable[[int], Any]],
) -> None:
with MilvusConnectionManager(config.online_store):
try:
rows = self._format_data_for_milvus(data)
collection_to_load_data = Collection(table.name)
collection_to_load_data.insert(rows)
# The flush call will seal any remaining segments and send them for indexing
collection_to_load_data.flush()
except Exception as e:
logger.error(f"Batch writing data failed due to {e}")
collection_to_load_data = Collection(table.name)
rows = self._format_data_for_milvus(data, collection_to_load_data)
collection_to_load_data.insert(rows)
# The flush call will seal any remaining segments and send them for indexing
collection_to_load_data.flush()
collection_to_load_data.load()
logger.info("loading data into memory")
utility.wait_for_loading_complete(table.name)
logger.info("loading data into memory complete")

def online_read(
self,
Expand All @@ -130,7 +131,8 @@ def online_read(
query_result, collection, requested_features
)

return results
# results do not have timestamps
return [(None, row) for row in results]

@log_exceptions_and_usage(online_store="milvus")
def update(
Expand All @@ -145,28 +147,33 @@ def update(
with MilvusConnectionManager(config.online_store):
for table_to_keep in tables_to_keep:
collection_available = utility.has_collection(table_to_keep.name)
try:
if collection_available:
logger.info(f"Collection {table_to_keep.name} already exists.")
else:
(
schema,
indexes,
) = self._convert_featureview_schema_to_milvus_readable(
table_to_keep.schema,

if collection_available:
logger.info(f"Collection {table_to_keep.name} already exists.")
else:
if not table_to_keep.schema:
raise ValueError(
f"a schema must be provided for feature_view: {table_to_keep}"
)

collection = Collection(name=table_to_keep.name, schema=schema)
(
schema,
indexes,
) = self._convert_featureview_schema_to_milvus_readable(
table_to_keep,
)

logger.info(
f"creating collection {table_to_keep.name} with schema: {schema}"
)
collection = Collection(name=table_to_keep.name, schema=schema)

for field_name, index_params in indexes.items():
collection.create_index(field_name, index_params)
for field_name, index_params in indexes.items():
collection.create_index(field_name, index_params)

logger.info(f"Collection name is {collection.name}")
logger.info(
f"Collection {table_to_keep.name} has been created successfully."
)
except Exception as e:
logger.error(f"Collection update failed due to {e}")
logger.info(
f"Collection {table_to_keep.name} has been created successfully."
)

for table_to_delete in tables_to_delete:
collection_available = utility.has_collection(table_to_delete.name)
Expand Down Expand Up @@ -198,35 +205,33 @@ def teardown(
utility.drop_collection(collection_name)

def _convert_featureview_schema_to_milvus_readable(
self, feast_schema: List[Field]
self, feature_view: FeatureView
) -> Tuple[CollectionSchema, Dict]:
"""
Converting a schema understood by Feast to a schema that is readable by Milvus so that it
can be used when a collection is created in Milvus.
Parameters:
feast_schema (List[Field]): Schema stored in FeatureView.
feature_view (FeatureView): the FeatureView that contains the schema.
Returns:
(CollectionSchema): Schema readable by Milvus.
(Dict): A dictionary of indexes to be created with the key as the vector field name and the value as the parameters
"""
boolean_mapping_from_string = {"True": True, "False": False}
field_list = []
indexes = {}

for field in feast_schema:
for field in feature_view.schema:

field_name = field.name
data_type = self._get_milvus_type(field.dtype)
dimensions = 0
description = ""
is_primary = True if field.name in feature_view.join_keys else False

if field.tags:
description = field.tags.get("description", " ")
is_primary = boolean_mapping_from_string.get(
field.tags.get("is_primary", "False")
)
description = field.tags.get("description", "")

if self._data_type_is_supported_vector(data_type) and field.tags.get(
"index_type"
Expand Down Expand Up @@ -281,27 +286,57 @@ def _data_type_is_supported_vector(self, data_type: DataType) -> bool:

return False

def _format_data_for_milvus(self, feast_data):
def _format_data_for_milvus(self, feast_data, collection: Collection):
"""
Format Feast input for Milvus: Data stored into Milvus takes the grouped representation approach where each feature value is grouped together:
[[1,2], [1,3]], [John, Lucy], [3,4]]
Parameters:
feast_data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]: Data represented for batch write in Feast
collection: target collection
Returns:
List[List]: transformed_data: Data that can be directly written into Milvus
pd.DataFrame: transformed_data: Data that can be directly written into Milvus
"""
# get the order of columns so that return data frame has the correct order. Milvus does need the correct order
# and does not use the column names when a data frame is passed.
schema = collection.schema
field_names = [field.name for field in schema.fields]

milvus_data = []
data = []
feature_names = None
for entity_key, values, timestamp, created_ts in feast_data:
feature = self._process_values_for_milvus(values)
milvus_data.append(feature)
feature_names = [entity_key.join_keys[0]]
feature = [self._get_value_from_value_proto(entity_key.entity_values[0])]
for feature_name, val in values.items():
value = self._get_value_from_value_proto(val)
feature.append(value)
feature_names.append(feature_name)
data.append(feature)

df = pd.DataFrame(data, columns=feature_names)
transformed_data = df.reindex(field_names, axis=1)

transformed_data = [list(item) for item in zip(*milvus_data)]
return transformed_data

def _get_value_from_value_proto(self, proto: ValueProto):
"""
Get the raw value from a value proto.
Parameters:
value (ValueProto): the value proto that contains the data.
Returns:
value (Any): the extracted value.
"""
val_type = proto.WhichOneof("val")
value = getattr(proto, val_type) # type: ignore
if val_type == "float_list_val":
value = np.array(value.val)

return value

def _create_index_params(self, tags: Dict[str, str], data_type: DataType):
"""
Parses the tags to generate the index_params needed to create the specified index
Expand Down Expand Up @@ -351,7 +386,7 @@ def _create_index_params(self, tags: Dict[str, str], data_type: DataType):
}

def _convert_milvus_result_to_feast_type(
self, milvus_result, collection, features_to_request
self, query_result, collection, requested_features
):
"""
Convert Milvus result to Feast types.
Expand All @@ -365,27 +400,22 @@ def _convert_milvus_result_to_feast_type(
List[Dict[str, ValueProto]]: Processed data with Feast types.
"""

# Here we are constructing the feature list to request from Milvus with their relevant types

# constructing the feature list to request from Milvus with their respective types
features_with_types = list(tuple())
for field in collection.schema.fields:
if field.name in features_to_request:
if field.name in requested_features:
features_with_types.append(
(field.name, self._get_feast_type(field.dtype))
)

feast_type_result = []
results = []
prefix = "valuetype."

for row in milvus_result:
for row in query_result:
result_row = {}
for feature, feast_type in features_with_types:

value_proto = ValueProto()
feature_value = row[feature]

if feature_value:
# Doing some pre-processing here to remove prefix
value_type_method = f"{feast_type.to_value_type()}_val".lower()
if value_type_method.startswith(prefix):
value_type_method = value_type_method[len(prefix) :]
Expand All @@ -394,8 +424,9 @@ def _convert_milvus_result_to_feast_type(
)
result_row[feature] = value_proto
# Append result after conversion to Feast Type
feast_type_result.append(result_row)
return feast_type_result
results.append(result_row)

return results

def _create_value_proto(self, val_proto, feature_val, value_type) -> ValueProto:
"""
Expand All @@ -409,11 +440,11 @@ def _create_value_proto(self, val_proto, feature_val, value_type) -> ValueProto:
Returns:
val_proto (ValueProto): Constructed result that Feast can understand.
"""

if value_type == "float_list_val":
val_proto = ValueProto(float_list_val=FloatList(val=feature_val))
else:
setattr(val_proto, value_type, feature_val)

return val_proto

def _construct_milvus_query(self, entities) -> str:
Expand All @@ -435,47 +466,14 @@ def _construct_milvus_query(self, entities) -> str:
for key in entity.join_keys:
entity_join_key.append(key)
for value in entity.entity_values:
value_to_search = self._get_value_to_search_in_milvus(value)
value_to_search = self._get_value_from_value_proto(value)
values_to_search.append(value_to_search)

# TODO: Enable multiple join key support. Currently only supporting a single primary key/ join key. This is a limitation in Feast.
milvus_query_expr = f"{entity_join_key[0]} in {values_to_search}"

return milvus_query_expr

def _process_values_for_milvus(self, values) -> List:
"""
Process values to prepare them for using in Milvus.
Parameters:
values: (Dict[str, ValueProto]): Dictionary of values from Feast data.
Returns:
(List): Processed feature values ready for storing in Milvus.
"""
feature = []
for feature_name, val in values.items():
value = self._get_value_to_search_in_milvus(val)
feature.append(value)
return feature

def _get_value_to_search_in_milvus(self, value) -> Any:
"""
Process a value to prepare it for searching in Milvus.
Parameters:
value (ValueProto): A value from Feast data.
Returns:
value (Any): Processed value ready for Milvus searching.
"""
val_type = value.WhichOneof("val")
if val_type == "float_list_val":
value = np.array(value.float_list_val.val)
else:
value = getattr(value, val_type)
return value

def _get_milvus_type(self, feast_type) -> DataType:
"""
Convert Feast type to Milvus type using the TYPE_MAPPING bidict.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def create_online_store(self) -> Dict[str, str]:
"Milvus Proxy successfully initialized and ready to serve!"
)
wait_for_logs(
container=self.container, predicate=log_string_to_wait_for, timeout=30
container=self.container, predicate=log_string_to_wait_for, timeout=60
)
exposed_port = self.container.get_exposed_port("19530")

Expand Down
Loading

0 comments on commit 01867f4

Please sign in to comment.