From e33936a41fbe46e150818b5ccb6dcd9275d2d239 Mon Sep 17 00:00:00 2001 From: piket Date: Mon, 6 Nov 2023 09:15:58 -0800 Subject: [PATCH] implement elasticsearch online read (#65) * implement elasticsearch online read * fix formatting for readability * fix tests and expanded value proto parsing * add elasticsearch to repo_config * use mysql image version 8.1.0 * reduce number of parallel tests * increase swap size * separate all expediagroup tests * fix formatting --- .github/workflows/unit_tests.yml | 14 +- .../vectordb/elasticsearch_online_store.py | 145 +++++++++++++++--- sdk/python/feast/repo_config.py | 1 + sdk/python/tests/doctest/test_all.py | 3 +- .../elasticsearch_online_store_creator.py | 11 +- .../test_elasticsearch_online_store.py | 117 ++++++++++++-- .../universal/online_store/mysql.py | 2 +- sdk/python/tests/unit/test_sql_registry.py | 2 +- setup.py | 1 - 9 files changed, 258 insertions(+), 38 deletions(-) diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index f690326598..ccc89ecc42 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -20,6 +20,16 @@ jobs: OS: ${{ matrix.os }} PYTHON: ${{ matrix.python-version }} steps: + - name: Increase swapfile + # Increase ubuntu's swapfile to avoid running out of resources which causes the action to terminate + if: startsWith(matrix.os, 'ubuntu') + run: | + sudo swapoff -a + sudo fallocate -l 8G /swapfile + sudo chmod 600 /swapfile + sudo mkswap /swapfile + sudo swapon /swapfile + sudo swapon --show - uses: actions/checkout@v2 - name: Setup Python id: setup-python @@ -80,7 +90,9 @@ jobs: - name: Install dependencies run: make install-python-ci-dependencies - name: Test Python - run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests + run: | + pytest -n 1 --cov=./ --cov-report=xml --color=yes sdk/python/tests/expediagroup + pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests --ignore=sdk/python/tests/expediagroup unit-test-go: runs-on: ubuntu-latest diff --git a/sdk/python/feast/expediagroup/vectordb/elasticsearch_online_store.py b/sdk/python/feast/expediagroup/vectordb/elasticsearch_online_store.py index 0c5767f39e..56b05d08ab 100644 --- a/sdk/python/feast/expediagroup/vectordb/elasticsearch_online_store.py +++ b/sdk/python/feast/expediagroup/vectordb/elasticsearch_online_store.py @@ -4,42 +4,57 @@ from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple -from bidict import bidict from elasticsearch import Elasticsearch, helpers from pydantic.typing import Literal from feast import Entity, FeatureView, RepoConfig from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import ( + BoolList, + BytesList, + DoubleList, + FloatList, + Int32List, + Int64List, + StringList, +) from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel from feast.types import ( + Array, Bool, Bytes, - ComplexFeastType, FeastType, Float32, Float64, Int32, Int64, + PrimitiveFeastType, String, UnixTimestamp, ) logger = logging.getLogger(__name__) -TYPE_MAPPING = bidict( - { - Bytes: "binary", - Int32: "integer", - Int64: "long", - Float32: "float", - Float64: "double", - Bool: "boolean", - String: "text", - UnixTimestamp: "date_nanos", - } -) +TYPE_MAPPING = { + Bytes: "binary", + Int32: "integer", + Int64: "long", + Float32: "float", + Float64: "double", + Bool: "boolean", + String: "text", + UnixTimestamp: "date_nanos", + Array(Bytes): "binary", + Array(Int32): "integer", + Array(Int64): "long", + Array(Float32): "float", + Array(Float64): "double", + Array(Bool): "boolean", + Array(String): "text", + Array(UnixTimestamp): "date_nanos", +} class ElasticsearchOnlineStoreConfig(FeastConfigBaseModel): @@ -108,7 +123,7 @@ def online_write_batch( for feature_name, val in values.items(): document[feature_name] = self._get_value_from_value_proto(val) bulk_documents.append( - {"_index": table.name, "_id": id_val, "doc": document} + {"_index": table.name, "_id": id_val, "_source": document} ) successes, errors = helpers.bulk(client=es, actions=bulk_documents) @@ -123,7 +138,49 @@ def online_read( entity_keys: List[EntityKeyProto], requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: - pass + with ElasticsearchConnectionManager(config) as es: + id_list = [] + for entity in entity_keys: + for val in entity.entity_values: + id_list.append(self._get_value_from_value_proto(val)) + + if requested_features is None: + requested_features = [f.name for f in table.schema] + + hits = es.search( + index=table.name, + source=False, + fields=requested_features, + query={"ids": {"values": id_list}}, + )["hits"] + if len(hits) > 0 and "hits" in hits: + hits = hits["hits"] + else: + return [] + + results: List[ + Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]] + ] = [] + prefix = "valuetype." + for hit in hits: + result_row = {} + doc = hit["fields"] + for feature in doc: + feast_type = next( + f.dtype for f in table.schema if f.name == feature + ) + value = ( + doc[feature][0] + if isinstance(feast_type, PrimitiveFeastType) + else doc[feature] + ) + value_type_method = f"{feast_type.to_value_type()}_val".lower() + if value_type_method.startswith(prefix): + value_type_method = value_type_method[len(prefix) :] + value_proto = self._create_value_proto(value, value_type_method) + result_row[feature] = value_proto + results.append((None, result_row)) + return results def update( self, @@ -183,8 +240,6 @@ def _create_index(self, es, fv): logger.info(f"Index {fv.name} created") def _get_data_type(self, t: FeastType) -> str: - if isinstance(t, ComplexFeastType): - return "text" return TYPE_MAPPING.get(t, "text") def _get_value_from_value_proto(self, proto: ValueProto): @@ -198,10 +253,62 @@ def _get_value_from_value_proto(self, proto: ValueProto): value (Any): the extracted value. """ val_type = proto.WhichOneof("val") + if not val_type: + return None + value = getattr(proto, val_type) # type: ignore if val_type == "bytes_val": value = base64.b64encode(value).decode() - if val_type == "float_list_val": + if val_type == "bytes_list_val": + value = [base64.b64encode(v).decode() for v in value.val] + elif "_list_val" in val_type: value = list(value.val) return value + + def _create_value_proto(self, feature_val, value_type) -> ValueProto: + """ + Construct Value Proto so that Feast can interpret Elasticsearch results + + Parameters: + feature_val (Union[list, int, str, double, float, bool, bytes]): An item in the result that Elasticsearch returns. + value_type (Str): Feast Value type; example: int64_val, float_val, etc. + + Returns: + val_proto (ValueProto): Constructed result that Feast can understand. + """ + if value_type == "bytes_list_val": + val_proto = ValueProto( + bytes_list_val=BytesList(val=[base64.b64decode(f) for f in feature_val]) + ) + elif value_type == "bytes_val": + val_proto = ValueProto(bytes_val=base64.b64decode(feature_val)) + elif value_type == "string_list_val": + val_proto = ValueProto(string_list_val=StringList(val=feature_val)) + elif value_type == "int32_list_val": + val_proto = ValueProto(int32_list_val=Int32List(val=feature_val)) + elif value_type == "int64_list_val": + val_proto = ValueProto(int64_list_val=Int64List(val=feature_val)) + elif value_type == "double_list_val": + val_proto = ValueProto(double_list_val=DoubleList(val=feature_val)) + elif value_type == "float_list_val": + val_proto = ValueProto(float_list_val=FloatList(val=feature_val)) + elif value_type == "bool_list_val": + val_proto = ValueProto(bool_list_val=BoolList(val=feature_val)) + elif value_type == "unix_timestamp_list_val": + nanos_list = [ + int(datetime.strptime(f, "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() * 1000) + for f in feature_val + ] + val_proto = ValueProto(unix_timestamp_list_val=Int64List(val=nanos_list)) + elif value_type == "unix_timestamp_val": + nanos = ( + datetime.strptime(feature_val, "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() + * 1000 + ) + val_proto = ValueProto(unix_timestamp_val=int(nanos)) + else: + val_proto = ValueProto() + setattr(val_proto, value_type, feature_val) + + return val_proto diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index fc20f1567b..1278752574 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -65,6 +65,7 @@ "rockset": "feast.infra.online_stores.contrib.rockset_online_store.rockset.RocksetOnlineStore", "hazelcast": "feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore", "milvus": "feast.expediagroup.vectordb.milvus_online_store.MilvusOnlineStore", + "elasticsearch": "feast.expediagroup.vectordb.elasticsearch_online_store.ElasticsearchOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { diff --git a/sdk/python/tests/doctest/test_all.py b/sdk/python/tests/doctest/test_all.py index 814a7ca798..4f4c5bcc22 100644 --- a/sdk/python/tests/doctest/test_all.py +++ b/sdk/python/tests/doctest/test_all.py @@ -113,5 +113,4 @@ def test_docstrings(): current_packages = next_packages - if not successful: - raise Exception(f"Docstring tests failed. Failed results: {failed_cases}") + assert successful, f"Docstring tests failed. Failed results: {failed_cases}" diff --git a/sdk/python/tests/expediagroup/elasticsearch_online_store_creator.py b/sdk/python/tests/expediagroup/elasticsearch_online_store_creator.py index 6bda8ac0ff..ee5361a600 100644 --- a/sdk/python/tests/expediagroup/elasticsearch_online_store_creator.py +++ b/sdk/python/tests/expediagroup/elasticsearch_online_store_creator.py @@ -11,19 +11,22 @@ logger = logging.getLogger(__name__) -class ElasticsearchOnlineCreator(OnlineStoreCreator): - def __init__(self, project_name: str, es_port: int): +class ElasticsearchOnlineStoreCreator(OnlineStoreCreator): + def __init__(self, project_name: str): super().__init__(project_name) + self.es_port = 9200 self.elasticsearch_container = ElasticSearchContainer( image="docker.elastic.co/elasticsearch/elasticsearch:8.8.2", - port_to_expose=es_port, + port_to_expose=self.es_port, ) def create_online_store(self): # Start the container self.elasticsearch_container.start() elasticsearch_host = self.elasticsearch_container.get_container_host_ip() - elasticsearch_http_port = self.elasticsearch_container.get_exposed_port(9200) + elasticsearch_http_port = self.elasticsearch_container.get_exposed_port( + self.es_port + ) return { "host": elasticsearch_host, "port": elasticsearch_http_port, diff --git a/sdk/python/tests/expediagroup/test_elasticsearch_online_store.py b/sdk/python/tests/expediagroup/test_elasticsearch_online_store.py index fbf68b8c9c..0917d1eb45 100644 --- a/sdk/python/tests/expediagroup/test_elasticsearch_online_store.py +++ b/sdk/python/tests/expediagroup/test_elasticsearch_online_store.py @@ -16,7 +16,7 @@ from feast.infra.offline_stores.file import FileOfflineStoreConfig from feast.infra.offline_stores.file_source import FileSource from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from feast.protos.feast.types.Value_pb2 import FloatList +from feast.protos.feast.types.Value_pb2 import BytesList, FloatList from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import RepoConfig from feast.types import ( @@ -31,7 +31,7 @@ UnixTimestamp, ) from tests.expediagroup.elasticsearch_online_store_creator import ( - ElasticsearchOnlineCreator, + ElasticsearchOnlineStoreCreator, ) logging.basicConfig(level=logging.INFO) @@ -67,7 +67,7 @@ def repo_config(embedded_elasticsearch): @pytest.fixture(scope="session") def embedded_elasticsearch(): - online_store_creator = ElasticsearchOnlineCreator(PROJECT, 9200) + online_store_creator = ElasticsearchOnlineStoreCreator(PROJECT) online_store_config = online_store_creator.create_online_store() yield online_store_config @@ -78,6 +78,7 @@ def embedded_elasticsearch(): class TestElasticsearchOnlineStore: index_to_write = "index_write" index_to_delete = "index_delete" + index_to_read = "index_read" unavailable_index = "abc" @pytest.fixture(autouse=True) @@ -89,6 +90,8 @@ def setup_method(self, repo_config): es.indices.delete(index=self.index_to_delete) if es.indices.exists(index=self.index_to_write): es.indices.delete(index=self.index_to_write) + if es.indices.exists(index=self.index_to_read): + es.indices.delete(index=self.index_to_read) if es.indices.exists(index=self.unavailable_index): es.indices.delete(index=self.unavailable_index) @@ -168,6 +171,7 @@ def test_elasticsearch_update_add_index(self, repo_config, caplog, index_params) "type": index_params["index_type"].lower(), **index_params["index_params"], } + with ElasticsearchConnectionManager(repo_config.online_store) as es: created_index = es.indices.get(index=self.index_to_write) assert created_index.body[self.index_to_write]["mappings"] == mapping @@ -205,6 +209,7 @@ def test_elasticsearch_update_add_existing_index(self, repo_config, caplog): entities_to_keep=[], partial=False, ) + with ElasticsearchConnectionManager(repo_config.online_store) as es: assert es.indices.exists(index=self.index_to_write).body is True @@ -226,6 +231,7 @@ def test_elasticsearch_update_delete_index(self, repo_config, caplog): ), ] self._create_index_in_es(self.index_to_delete, repo_config) + with ElasticsearchConnectionManager(repo_config.online_store) as es: assert es.indices.exists(index=self.index_to_delete).body is True @@ -244,6 +250,7 @@ def test_elasticsearch_update_delete_index(self, repo_config, caplog): entities_to_keep=[], partial=False, ) + with ElasticsearchConnectionManager(repo_config.online_store) as es: assert es.indices.exists(index=self.index_to_delete).body is False @@ -264,6 +271,7 @@ def test_elasticsearch_update_delete_unavailable_index(self, repo_config, caplog dtype=String, ), ] + with ElasticsearchConnectionManager(repo_config.online_store) as es: assert es.indices.exists(index=self.index_to_delete).body is False @@ -282,16 +290,18 @@ def test_elasticsearch_update_delete_unavailable_index(self, repo_config, caplog entities_to_keep=[], partial=False, ) + with ElasticsearchConnectionManager(repo_config.online_store) as es: assert es.indices.exists(index=self.index_to_delete).body is False def test_elasticsearch_online_write_batch(self, repo_config, caplog): - total_rows_to_write = 100 + total_rows_to_write = 10 ( feature_view, data, ) = self._create_n_customer_test_samples_elasticsearch_online_read( - n=total_rows_to_write + name=self.index_to_write, + n=total_rows_to_write, ) ElasticsearchOnlineStore().online_write_batch( config=repo_config.online_store, @@ -303,11 +313,93 @@ def test_elasticsearch_online_write_batch(self, repo_config, caplog): with ElasticsearchConnectionManager(repo_config.online_store) as es: es.indices.refresh(index=self.index_to_write) res = es.cat.count(index=self.index_to_write, params={"format": "json"}) - assert res[0]["count"] == "100" - doc = es.get(index=self.index_to_write, id="0")["_source"]["doc"] + assert res[0]["count"] == f"{total_rows_to_write}" + doc = es.get(index=self.index_to_write, id="0")["_source"] for feature in feature_view.schema: assert feature.name in doc + def test_elasticsearch_online_read(self, repo_config, caplog): + n = 10 + ( + feature_view, + data, + ) = self._create_n_customer_test_samples_elasticsearch_online_read( + name=self.index_to_read, n=n + ) + ids = [ + EntityKeyProto( + join_keys=["id"], entity_values=[ValueProto(string_val=str(i))] + ) + for i in range(n) + ] + store = ElasticsearchOnlineStore() + store.online_write_batch( + config=repo_config.online_store, + table=feature_view, + data=data, + progress=None, + ) + + with ElasticsearchConnectionManager(repo_config.online_store) as es: + es.indices.refresh(index=self.index_to_read) + + result = store.online_read( + config=repo_config.online_store, + table=feature_view, + entity_keys=ids, + ) + + assert result is not None + assert len(result) == n + for dt, doc in result: + assert doc is not None + assert len(doc) == len(feature_view.schema) + for field in feature_view.schema: + assert field.name in doc + + def test_elasticsearch_online_read_with_requested_features( + self, repo_config, caplog + ): + n = 10 + requested_features = ["int", "vector", "id"] + ( + feature_view, + data, + ) = self._create_n_customer_test_samples_elasticsearch_online_read( + name=self.index_to_read, n=n + ) + ids = [ + EntityKeyProto( + join_keys=["id"], entity_values=[ValueProto(string_val=str(i))] + ) + for i in range(n) + ] + store = ElasticsearchOnlineStore() + store.online_write_batch( + config=repo_config.online_store, + table=feature_view, + data=data, + progress=None, + ) + + with ElasticsearchConnectionManager(repo_config.online_store) as es: + es.indices.refresh(index=self.index_to_read) + + result = store.online_read( + config=repo_config.online_store, + table=feature_view, + entity_keys=ids, + requested_features=requested_features, + ) + + assert result is not None + assert len(result) == n + for dt, doc in result: + assert doc is not None + assert len(doc) == 3 + for field in requested_features: + assert field in doc + def _create_index_in_es(self, index_name, repo_config): with ElasticsearchConnectionManager(repo_config.online_store) as es: mapping = { @@ -323,9 +415,9 @@ def _create_index_in_es(self, index_name, repo_config): } es.indices.create(index=index_name, mappings=mapping) - def _create_n_customer_test_samples_elasticsearch_online_read(self, n=10): + def _create_n_customer_test_samples_elasticsearch_online_read(self, name, n=10): fv = FeatureView( - name=self.index_to_write, + name=name, source=SOURCE, entities=[Entity(name="id")], schema=[ @@ -374,6 +466,10 @@ def _create_n_customer_test_samples_elasticsearch_online_read(self, n=10): name="timestamp", dtype=UnixTimestamp, ), + Field( + name="byte_list", + dtype=Array(Bytes), + ), ], ) return fv, [ @@ -398,6 +494,9 @@ def _create_n_customer_test_samples_elasticsearch_online_read(self, n=10): "timestamp": ValueProto( unix_timestamp_val=int(datetime.utcnow().timestamp() * 1000) ), + "byte_list": ValueProto( + bytes_list_val=BytesList(val=[b"a", b"b", b"c"]) + ), }, datetime.utcnow(), None, diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py b/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py index 093295c86b..3c9ebccbf6 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py @@ -11,7 +11,7 @@ class MySQLOnlineStoreCreator(OnlineStoreCreator): def __init__(self, project_name: str, **kwargs): super().__init__(project_name) self.container = ( - MySqlContainer("mysql:latest", platform="linux/amd64") + MySqlContainer("mysql:8.1.0", platform="linux/amd64") .with_exposed_ports(3306) .with_env("MYSQL_USER", "root") .with_env("MYSQL_PASSWORD", "test") diff --git a/sdk/python/tests/unit/test_sql_registry.py b/sdk/python/tests/unit/test_sql_registry.py index cf3ec33cde..5fba4013bd 100644 --- a/sdk/python/tests/unit/test_sql_registry.py +++ b/sdk/python/tests/unit/test_sql_registry.py @@ -81,7 +81,7 @@ def pg_registry(): @pytest.fixture(scope="session") def mysql_registry(): container = ( - DockerContainer("mysql:latest") + DockerContainer("mysql:8.1.0") .with_exposed_ports(3306) .with_env("MYSQL_RANDOM_ROOT_PASSWORD", "true") .with_env("MYSQL_USER", POSTGRES_USER) diff --git a/setup.py b/setup.py index 96273be713..c563cade34 100644 --- a/setup.py +++ b/setup.py @@ -155,7 +155,6 @@ ELASTICSEARCH_REQUIRED = [ "elasticsearch==8.8", - "bidict==0.22.1", ] CI_REQUIRED = (