Skip to content

Commit

Permalink
fix: Update SparkKafkaProcessor to drop unnecessary columns and renam…
Browse files Browse the repository at this point in the history
…e feature_value to value
  • Loading branch information
Bhargav Dodla committed Jan 15, 2025
1 parent 075c4c0 commit 95bc2a6
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,11 @@ def batch_write_with_connector(
batch_id: int,
):
start_time = time.time()
sdf = sdf.drop("event_header")
convert_to_blob = udf(lambda s: s.encode("utf-8"), BinaryType())
sdf = sdf.withColumn("feature_value", convert_to_blob(col("feature_value")))
sdf = sdf.withColumn("value", convert_to_blob(col("feature_value"))).drop(
"event_header",
"feature_value",
)
sdf.write.format("org.apache.spark.sql.cassandra").mode("append").options(
table="mlpfs_scylladb_perf_test_cc_stream_fv", keyspace="feast"
).save()
Expand Down

0 comments on commit 95bc2a6

Please sign in to comment.