Skip to content

Commit

Permalink
Merge pull request #55 from ExpediaGroup/tipike/odfv_testing
Browse files Browse the repository at this point in the history
error catching around pyarrow to pandas convertion
  • Loading branch information
piket authored Oct 7, 2023
2 parents 0894c2d + 261f14f commit 9a9577b
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions sdk/python/feast/embedded_go/online_features_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
Expand Down Expand Up @@ -36,6 +37,8 @@
MILLI_SECOND = 1000 * MICRO_SECOND
SECOND = 1000 * MILLI_SECOND

logger = logging.getLogger(__name__)


class EmbeddedOnlineFeatureServer:
def __init__(
Expand Down Expand Up @@ -243,23 +246,27 @@ def transformation_callback(
output_schema_ptr: int,
full_feature_names: bool,
) -> int:
odfv = fs.get_on_demand_feature_view(on_demand_feature_view_name, allow_cache=True)
try:
odfv = fs.get_on_demand_feature_view(on_demand_feature_view_name, allow_cache=True)

input_record = pa.RecordBatch._import_from_c(input_arr_ptr, input_schema_ptr)
input_record = pa.RecordBatch._import_from_c(input_arr_ptr, input_schema_ptr)

# For some reason, the callback is called with `full_feature_names` as a 1 if True or 0 if false. This handles
# the typeguard requirement.
full_feature_names = bool(full_feature_names)
# For some reason, the callback is called with `full_feature_names` as a 1 if True or 0 if false. This handles
# the typeguard requirement.
full_feature_names = bool(full_feature_names)

output = odfv.get_transformed_features_df(
input_record.to_pandas(), full_feature_names=full_feature_names
)
output_record = pa.RecordBatch.from_pandas(output)
output = odfv.get_transformed_features_df(
input_record.to_pandas(), full_feature_names=full_feature_names
)
output_record = pa.RecordBatch.from_pandas(output)

output_record.schema._export_to_c(output_schema_ptr)
output_record._export_to_c(output_arr_ptr)
output_record.schema._export_to_c(output_schema_ptr)
output_record._export_to_c(output_arr_ptr)

return output_record.num_rows
return output_record.num_rows
except Exception as e:
logger.exception(f"transformation callback failed with exception: {e}", e)
return 0


def logging_callback(
Expand Down

0 comments on commit 9a9577b

Please sign in to comment.