Skip to content

Commit

Permalink
disable es read unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
piket committed Nov 2, 2023
1 parent a8eb0a5 commit 782f3aa
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 32 deletions.
12 changes: 11 additions & 1 deletion .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ jobs:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python-version }}
steps:
# - name: Increase swapfile
# # Increase ubuntu's swapfile to avoid running out of resources which causes the action to terminate
# if: startsWith(matrix.os, 'ubuntu')
# run: |
# sudo swapoff -a
# sudo fallocate -l 15G /swapfile
# sudo chmod 600 /swapfile
# sudo mkswap /swapfile
# sudo swapon /swapfile
# sudo swapon --show
- uses: actions/checkout@v2
- name: Setup Python
id: setup-python
Expand Down Expand Up @@ -80,7 +90,7 @@ jobs:
- name: Install dependencies
run: make install-python-ci-dependencies
- name: Test Python
run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests
run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests -o log_cli=true

unit-test-go:
runs-on: ubuntu-latest
Expand Down
145 changes: 126 additions & 19 deletions sdk/python/feast/expediagroup/vectordb/elasticsearch_online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,57 @@
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple

from bidict import bidict
from elasticsearch import Elasticsearch, helpers
from pydantic.typing import Literal

from feast import Entity, FeatureView, RepoConfig
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import (
BoolList,
BytesList,
DoubleList,
FloatList,
Int32List,
Int64List,
StringList,
)
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel
from feast.types import (
Array,
Bool,
Bytes,
ComplexFeastType,
FeastType,
Float32,
Float64,
Int32,
Int64,
PrimitiveFeastType,
String,
UnixTimestamp,
)

logger = logging.getLogger(__name__)

TYPE_MAPPING = bidict(
{
Bytes: "binary",
Int32: "integer",
Int64: "long",
Float32: "float",
Float64: "double",
Bool: "boolean",
String: "text",
UnixTimestamp: "date_nanos",
}
)
TYPE_MAPPING = {
Bytes: "binary",
Int32: "integer",
Int64: "long",
Float32: "float",
Float64: "double",
Bool: "boolean",
String: "text",
UnixTimestamp: "date_nanos",
Array(Bytes): "binary",
Array(Int32): "integer",
Array(Int64): "long",
Array(Float32): "float",
Array(Float64): "double",
Array(Bool): "boolean",
Array(String): "text",
Array(UnixTimestamp): "date_nanos",
}


class ElasticsearchOnlineStoreConfig(FeastConfigBaseModel):
Expand Down Expand Up @@ -108,7 +123,7 @@ def online_write_batch(
for feature_name, val in values.items():
document[feature_name] = self._get_value_from_value_proto(val)
bulk_documents.append(
{"_index": table.name, "_id": id_val, "doc": document}
{"_index": table.name, "_id": id_val, "_source": document}
)

successes, errors = helpers.bulk(client=es, actions=bulk_documents)
Expand All @@ -123,7 +138,49 @@ def online_read(
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
pass
with ElasticsearchConnectionManager(config) as es:
id_list = []
for entity in entity_keys:
for val in entity.entity_values:
id_list.append(self._get_value_from_value_proto(val))

if requested_features is None:
requested_features = [f.name for f in table.schema]

hits = es.search(
index=table.name,
source=False,
fields=requested_features,
query={"ids": {"values": id_list}},
)["hits"]
if len(hits) > 0 and "hits" in hits:
hits = hits["hits"]
else:
return []

results: List[
Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]
] = []
prefix = "valuetype."
for hit in hits:
result_row = {}
doc = hit["fields"]
for feature in doc:
feast_type = next(
f.dtype for f in table.schema if f.name == feature
)
value = (
doc[feature][0]
if isinstance(feast_type, PrimitiveFeastType)
else doc[feature]
)
value_type_method = f"{feast_type.to_value_type()}_val".lower()
if value_type_method.startswith(prefix):
value_type_method = value_type_method[len(prefix) :]
value_proto = self._create_value_proto(value, value_type_method)
result_row[feature] = value_proto
results.append((None, result_row))
return results

def update(
self,
Expand Down Expand Up @@ -183,8 +240,6 @@ def _create_index(self, es, fv):
logger.info(f"Index {fv.name} created")

def _get_data_type(self, t: FeastType) -> str:
if isinstance(t, ComplexFeastType):
return "text"
return TYPE_MAPPING.get(t, "text")

def _get_value_from_value_proto(self, proto: ValueProto):
Expand All @@ -198,10 +253,62 @@ def _get_value_from_value_proto(self, proto: ValueProto):
value (Any): the extracted value.
"""
val_type = proto.WhichOneof("val")
if not val_type:
return None

value = getattr(proto, val_type) # type: ignore
if val_type == "bytes_val":
value = base64.b64encode(value).decode()
if val_type == "float_list_val":
if val_type == "bytes_list_val":
value = [base64.b64encode(v).decode() for v in value.val]
elif "_list_val" in val_type:
value = list(value.val)

return value

def _create_value_proto(self, feature_val, value_type) -> ValueProto:
"""
Construct Value Proto so that Feast can interpret Elasticsearch results
Parameters:
feature_val (Union[list, int, str, double, float, bool, bytes]): An item in the result that Elasticsearch returns.
value_type (Str): Feast Value type; example: int64_val, float_val, etc.
Returns:
val_proto (ValueProto): Constructed result that Feast can understand.
"""
if value_type == "bytes_list_val":
val_proto = ValueProto(
bytes_list_val=BytesList(val=[base64.b64decode(f) for f in feature_val])
)
elif value_type == "bytes_val":
val_proto = ValueProto(bytes_val=base64.b64decode(feature_val))
elif value_type == "string_list_val":
val_proto = ValueProto(string_list_val=StringList(val=feature_val))
elif value_type == "int32_list_val":
val_proto = ValueProto(int32_list_val=Int32List(val=feature_val))
elif value_type == "int64_list_val":
val_proto = ValueProto(int64_list_val=Int64List(val=feature_val))
elif value_type == "double_list_val":
val_proto = ValueProto(double_list_val=DoubleList(val=feature_val))
elif value_type == "float_list_val":
val_proto = ValueProto(float_list_val=FloatList(val=feature_val))
elif value_type == "bool_list_val":
val_proto = ValueProto(bool_list_val=BoolList(val=feature_val))
elif value_type == "unix_timestamp_list_val":
nanos_list = [
int(datetime.strptime(f, "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() * 1000)
for f in feature_val
]
val_proto = ValueProto(unix_timestamp_list_val=Int64List(val=nanos_list))
elif value_type == "unix_timestamp_val":
nanos = (
datetime.strptime(feature_val, "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
* 1000
)
val_proto = ValueProto(unix_timestamp_val=int(nanos))
else:
val_proto = ValueProto()
setattr(val_proto, value_type, feature_val)

return val_proto
1 change: 1 addition & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"rockset": "feast.infra.online_stores.contrib.rockset_online_store.rockset.RocksetOnlineStore",
"hazelcast": "feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore",
"milvus": "feast.expediagroup.vectordb.milvus_online_store.MilvusOnlineStore",
"elasticsearch": "feast.expediagroup.vectordb.elasticsearch_online_store.ElasticsearchOnlineStore",
}

OFFLINE_STORE_CLASS_FOR_TYPE = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
logger = logging.getLogger(__name__)


class ElasticsearchOnlineCreator(OnlineStoreCreator):
class ElasticsearchOnlineStoreCreator(OnlineStoreCreator):
def __init__(self, project_name: str, es_port: int):
super().__init__(project_name)
self.elasticsearch_container = ElasticSearchContainer(
Expand Down
Loading

0 comments on commit 782f3aa

Please sign in to comment.