Skip to content

Commit

Permalink
implement elasticsearch online read (#65)
Browse files Browse the repository at this point in the history
* implement elasticsearch online read

* fix formatting for readability

* fix tests and expanded value proto parsing

* add elasticsearch to repo_config

* use mysql image version 8.1.0

* reduce number of parallel tests

* increase swap size

* separate all expediagroup tests

* fix formatting
  • Loading branch information
piket authored Nov 6, 2023
1 parent a8eb0a5 commit e33936a
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 38 deletions.
14 changes: 13 additions & 1 deletion .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ jobs:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python-version }}
steps:
- name: Increase swapfile
# Increase ubuntu's swapfile to avoid running out of resources which causes the action to terminate
if: startsWith(matrix.os, 'ubuntu')
run: |
sudo swapoff -a
sudo fallocate -l 8G /swapfile
sudo chmod 600 /swapfile
sudo mkswap /swapfile
sudo swapon /swapfile
sudo swapon --show
- uses: actions/checkout@v2
- name: Setup Python
id: setup-python
Expand Down Expand Up @@ -80,7 +90,9 @@ jobs:
- name: Install dependencies
run: make install-python-ci-dependencies
- name: Test Python
run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests
run: |
pytest -n 1 --cov=./ --cov-report=xml --color=yes sdk/python/tests/expediagroup
pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests --ignore=sdk/python/tests/expediagroup
unit-test-go:
runs-on: ubuntu-latest
Expand Down
145 changes: 126 additions & 19 deletions sdk/python/feast/expediagroup/vectordb/elasticsearch_online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,57 @@
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple

from bidict import bidict
from elasticsearch import Elasticsearch, helpers
from pydantic.typing import Literal

from feast import Entity, FeatureView, RepoConfig
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import (
BoolList,
BytesList,
DoubleList,
FloatList,
Int32List,
Int64List,
StringList,
)
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel
from feast.types import (
Array,
Bool,
Bytes,
ComplexFeastType,
FeastType,
Float32,
Float64,
Int32,
Int64,
PrimitiveFeastType,
String,
UnixTimestamp,
)

logger = logging.getLogger(__name__)

TYPE_MAPPING = bidict(
{
Bytes: "binary",
Int32: "integer",
Int64: "long",
Float32: "float",
Float64: "double",
Bool: "boolean",
String: "text",
UnixTimestamp: "date_nanos",
}
)
TYPE_MAPPING = {
Bytes: "binary",
Int32: "integer",
Int64: "long",
Float32: "float",
Float64: "double",
Bool: "boolean",
String: "text",
UnixTimestamp: "date_nanos",
Array(Bytes): "binary",
Array(Int32): "integer",
Array(Int64): "long",
Array(Float32): "float",
Array(Float64): "double",
Array(Bool): "boolean",
Array(String): "text",
Array(UnixTimestamp): "date_nanos",
}


class ElasticsearchOnlineStoreConfig(FeastConfigBaseModel):
Expand Down Expand Up @@ -108,7 +123,7 @@ def online_write_batch(
for feature_name, val in values.items():
document[feature_name] = self._get_value_from_value_proto(val)
bulk_documents.append(
{"_index": table.name, "_id": id_val, "doc": document}
{"_index": table.name, "_id": id_val, "_source": document}
)

successes, errors = helpers.bulk(client=es, actions=bulk_documents)
Expand All @@ -123,7 +138,49 @@ def online_read(
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
pass
with ElasticsearchConnectionManager(config) as es:
id_list = []
for entity in entity_keys:
for val in entity.entity_values:
id_list.append(self._get_value_from_value_proto(val))

if requested_features is None:
requested_features = [f.name for f in table.schema]

hits = es.search(
index=table.name,
source=False,
fields=requested_features,
query={"ids": {"values": id_list}},
)["hits"]
if len(hits) > 0 and "hits" in hits:
hits = hits["hits"]
else:
return []

results: List[
Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]
] = []
prefix = "valuetype."
for hit in hits:
result_row = {}
doc = hit["fields"]
for feature in doc:
feast_type = next(
f.dtype for f in table.schema if f.name == feature
)
value = (
doc[feature][0]
if isinstance(feast_type, PrimitiveFeastType)
else doc[feature]
)
value_type_method = f"{feast_type.to_value_type()}_val".lower()
if value_type_method.startswith(prefix):
value_type_method = value_type_method[len(prefix) :]
value_proto = self._create_value_proto(value, value_type_method)
result_row[feature] = value_proto
results.append((None, result_row))
return results

def update(
self,
Expand Down Expand Up @@ -183,8 +240,6 @@ def _create_index(self, es, fv):
logger.info(f"Index {fv.name} created")

def _get_data_type(self, t: FeastType) -> str:
if isinstance(t, ComplexFeastType):
return "text"
return TYPE_MAPPING.get(t, "text")

def _get_value_from_value_proto(self, proto: ValueProto):
Expand All @@ -198,10 +253,62 @@ def _get_value_from_value_proto(self, proto: ValueProto):
value (Any): the extracted value.
"""
val_type = proto.WhichOneof("val")
if not val_type:
return None

value = getattr(proto, val_type) # type: ignore
if val_type == "bytes_val":
value = base64.b64encode(value).decode()
if val_type == "float_list_val":
if val_type == "bytes_list_val":
value = [base64.b64encode(v).decode() for v in value.val]
elif "_list_val" in val_type:
value = list(value.val)

return value

def _create_value_proto(self, feature_val, value_type) -> ValueProto:
"""
Construct Value Proto so that Feast can interpret Elasticsearch results
Parameters:
feature_val (Union[list, int, str, double, float, bool, bytes]): An item in the result that Elasticsearch returns.
value_type (Str): Feast Value type; example: int64_val, float_val, etc.
Returns:
val_proto (ValueProto): Constructed result that Feast can understand.
"""
if value_type == "bytes_list_val":
val_proto = ValueProto(
bytes_list_val=BytesList(val=[base64.b64decode(f) for f in feature_val])
)
elif value_type == "bytes_val":
val_proto = ValueProto(bytes_val=base64.b64decode(feature_val))
elif value_type == "string_list_val":
val_proto = ValueProto(string_list_val=StringList(val=feature_val))
elif value_type == "int32_list_val":
val_proto = ValueProto(int32_list_val=Int32List(val=feature_val))
elif value_type == "int64_list_val":
val_proto = ValueProto(int64_list_val=Int64List(val=feature_val))
elif value_type == "double_list_val":
val_proto = ValueProto(double_list_val=DoubleList(val=feature_val))
elif value_type == "float_list_val":
val_proto = ValueProto(float_list_val=FloatList(val=feature_val))
elif value_type == "bool_list_val":
val_proto = ValueProto(bool_list_val=BoolList(val=feature_val))
elif value_type == "unix_timestamp_list_val":
nanos_list = [
int(datetime.strptime(f, "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() * 1000)
for f in feature_val
]
val_proto = ValueProto(unix_timestamp_list_val=Int64List(val=nanos_list))
elif value_type == "unix_timestamp_val":
nanos = (
datetime.strptime(feature_val, "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
* 1000
)
val_proto = ValueProto(unix_timestamp_val=int(nanos))
else:
val_proto = ValueProto()
setattr(val_proto, value_type, feature_val)

return val_proto
1 change: 1 addition & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"rockset": "feast.infra.online_stores.contrib.rockset_online_store.rockset.RocksetOnlineStore",
"hazelcast": "feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore",
"milvus": "feast.expediagroup.vectordb.milvus_online_store.MilvusOnlineStore",
"elasticsearch": "feast.expediagroup.vectordb.elasticsearch_online_store.ElasticsearchOnlineStore",
}

OFFLINE_STORE_CLASS_FOR_TYPE = {
Expand Down
3 changes: 1 addition & 2 deletions sdk/python/tests/doctest/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,4 @@ def test_docstrings():

current_packages = next_packages

if not successful:
raise Exception(f"Docstring tests failed. Failed results: {failed_cases}")
assert successful, f"Docstring tests failed. Failed results: {failed_cases}"
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,22 @@
logger = logging.getLogger(__name__)


class ElasticsearchOnlineCreator(OnlineStoreCreator):
def __init__(self, project_name: str, es_port: int):
class ElasticsearchOnlineStoreCreator(OnlineStoreCreator):
def __init__(self, project_name: str):
super().__init__(project_name)
self.es_port = 9200
self.elasticsearch_container = ElasticSearchContainer(
image="docker.elastic.co/elasticsearch/elasticsearch:8.8.2",
port_to_expose=es_port,
port_to_expose=self.es_port,
)

def create_online_store(self):
# Start the container
self.elasticsearch_container.start()
elasticsearch_host = self.elasticsearch_container.get_container_host_ip()
elasticsearch_http_port = self.elasticsearch_container.get_exposed_port(9200)
elasticsearch_http_port = self.elasticsearch_container.get_exposed_port(
self.es_port
)
return {
"host": elasticsearch_host,
"port": elasticsearch_http_port,
Expand Down
Loading

0 comments on commit e33936a

Please sign in to comment.