Skip to content

Commit

Permalink
Start adding FeatureView to spark kafka processor, and add a sketch o…
Browse files Browse the repository at this point in the history
…f spark and stream feature view setup to the streaming test
  • Loading branch information
expediamatt committed Feb 13, 2024
1 parent d2f0a07 commit 85aeb7d
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 5 deletions.
8 changes: 7 additions & 1 deletion sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,16 @@ def __init__(
self,
*,
fs: FeatureStore,
sfv: StreamFeatureView,
sfv: FeatureView,
config: ProcessorConfig,
preprocess_fn: Optional[MethodType] = None,
):

# In general, FeatureView may or may not have stream_source, but it must
# have one to use spark kafka processor
if not sfv.stream_source:
raise ValueError("Feature View must have a stream source to use spark streaming.")

if not isinstance(sfv.stream_source, KafkaSource):
raise ValueError("data source is not kafka source")
if not isinstance(
Expand Down
39 changes: 35 additions & 4 deletions sdk/python/tests/unit/infra/test_streaming_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,42 @@

from feast.entity import Entity
from feast.value_type import ValueType
from feast.infra.contrib.spark_kafka_processor import SparkKafkaProcessor


def test_the_test():
entity = Entity(name="my-entity", description="My entity")
assert entity.join_key == "my-entity"
assert(5==5)
def test_streaming_ingestion():

spark_config = IntegrationTestRepoConfig(
provider="local",
online_store_creator=RedisOnlineStoreCreator,
offline_store_creator=SparkDataSourceCreator,
batch_engine={"type": "spark.engine", "partitions": 10},
)
spark_environment = construct_test_environment(
spark_config, None, entity_key_serialization_version=1
)

df = create_basic_driver_dataset()

# Make a stream source.
stream_source = KafkaSource(
name="kafka",
timestamp_field="event_timestamp",
kafka_bootstrap_servers="",
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource(path="some path"),
)
StreamFeatureView(
name="test kafka stream feature view",
entities=[],
ttl=timedelta(days=30),
source=stream_source,
aggregations=[],
)



# processor = SparkKafkaProcessor()
#

0 comments on commit 85aeb7d

Please sign in to comment.