Skip to content

Commit

Permalink
[FSTORE_612] Support for feature monitoring (#1692)
Browse files Browse the repository at this point in the history
Co-authored-by: Robin Andersson @robzor92
Co-authored-by: Victor Jouffrey @vatj
Co-authored-by: Kenneth Mak @kennethmhc
Co-authored-by: Dhananjay Mukhedkar @dhananjay-mk
Co-authored-by: Ehsan Heydari @ehsan-github
  • Loading branch information
javierdlrm committed Feb 13, 2024
1 parent 8bca3d5 commit 43a49f0
Show file tree
Hide file tree
Showing 109 changed files with 10,112 additions and 1,184 deletions.
1,026 changes: 1,026 additions & 0 deletions hopsworks-IT/src/test/ruby/spec/feature_monitoring_spec.rb

Large diffs are not rendered by default.

755 changes: 391 additions & 364 deletions hopsworks-IT/src/test/ruby/spec/feature_store_activity_spec.rb

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
=begin
This file is part of Hopsworks
Copyright (C) 2021, Logical Clocks AB. All rights reserved
# This file is part of Hopsworks
# Copyright (C) 2024, Hopsworks AB. All rights reserved
#
# Hopsworks is free software: you can redistribute it and/or modify it under the terms of
# the GNU Affero General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
# PURPOSE. See the GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along with this program.
# If not, see <https://www.gnu.org/licenses/>.
#

Hopsworks is free software: you can redistribute it and/or modify it under the terms of
the GNU Affero General Public License as published by the Free Software Foundation,
either version 3 of the License, or (at your option) any later version.
describe "On #{ENV['OS']}" do

Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
PURPOSE. See the GNU Affero General Public License for more details.
before :all do
# ensure feature monitoring is enabled
@enable_feature_monitoring = getVar('enable_feature_monitoring')
setVar('enable_feature_monitoring', "true")
end

You should have received a copy of the GNU Affero General Public License along with this program.
If not, see <https://www.gnu.org/licenses/>.
=end
after :all do
# revert feature monitoring flag
setVar('enable_feature_monitoring', @enable_feature_monitoring[:value])
clean_all_test_projects(spec: "fg_alert")
end

describe "On #{ENV['OS']}" do
after(:all) {clean_all_test_projects(spec: "fg_alert")}
describe 'Alert' do
context 'without authentication' do
before :all do
Expand All @@ -31,17 +42,25 @@
create_fg_alert(@project, @featuregroup, get_fg_alert_success(@project))
expect_status_details(401)
end
it "should fail to create feature monitoring status" do
create_fg_alert(@project, @featuregroup, get_fm_alert_success(@project))
expect_status_details(401)
end
end
context 'with authentication' do
before :all do
with_valid_project
@featuregroup = with_valid_fg(@project)
json_result = create_feature_view_from_feature_group(@project.id, get_featurestore_id(@project.id), @featuregroup)
@feature_view = JSON.parse(json_result)
create_fg_alerts(@project, @featuregroup)
create_fm_alerts(@project, @featuregroup, @feature_view)

end
it "should get" do
get_fg_alerts(@project, @featuregroup)
expect_status_details(200)
expect(json_body[:count]).to eq(2)
expect(json_body[:count]).to eq(3)
end
it "should update" do
get_fg_alerts(@project, @featuregroup)
Expand All @@ -68,7 +87,7 @@
expect_status_details(201)
check_route_created(@project, alert[:receiver], alert[:status], fg: @featuregroup)
get_fg_alerts(@project, @featuregroup)
expect(json_body[:count]).to eq(3)
expect(json_body[:count]).to eq(4)
end
it "should fail to create duplicate" do
create_fg_alert(@project, @featuregroup, get_fg_alert_warning(@project))
Expand All @@ -81,7 +100,48 @@
expect_status_details(204)
check_route_deleted(@project, alert[:receiver], alert[:status], fg: @featuregroup)
get_fg_alerts(@project, @featuregroup)
expect(json_body[:count]).to eq(2)
expect(json_body[:count]).to eq(3)
end
it "should create alert with feature monitoring status" do
alert_data = get_fm_alert_success(@project)
json_result = create_fg_alert(@project, @featuregroup, alert_data)
parsed_alert_json = JSON.parse(json_result)
expect_status_details(201)
check_route_created_fm(@project, alert_data[:receiver], alert_data[:status], fg: @featuregroup)
expect(parsed_alert_json['status']).to eql(alert_data[:status])
expect(parsed_alert_json['receiver']).to eql(alert_data[:receiver])
expect(parsed_alert_json['severity']).to eql(alert_data[:severity])
expect(parsed_alert_json['featureGroupId']).to eql(@featuregroup["id"])
end
it "should create feature view monitoring alert" do
alert_data = get_fm_alert_success(@project)
json_result = create_feature_view_alert(@project, @feature_view, alert_data)
parsed_alert_json = JSON.parse(json_result)
expect_status_details(201)
expect(parsed_alert_json['status']).to eql(alert_data[:status])
expect(parsed_alert_json['receiver']).to eql(alert_data[:receiver])
expect(parsed_alert_json['severity']).to eql(alert_data[:severity])
expect(parsed_alert_json['featureViewName']).to eql(@feature_view["name"])
expect(parsed_alert_json['featureViewVersion']).to eql(@feature_view["version"])
check_route_created_fm(@project, alert_data[:receiver], alert_data[:status], fv: @feature_view)
end
it "should get and update feature view monitoring alert" do
get_featureview_alerts(@project, @feature_view)
expect_status_details(200)
alert = json_body[:items].detect { |a| a[:status] == "FEATURE_MONITOR_SHIFT_UNDETECTED" && a[:featureViewId]
.present? }
receiver_original = alert[:receiver]
alert[:receiver] = "#{@project[:projectname]}__slack1"
json_result = update_featureview_alert(@project, @feature_view, alert[:id], alert)
expect_status_details(200)
parsed_updated_alert = JSON.parse(json_result)
expect(parsed_updated_alert['status']).to eql(alert[:status])
expect(parsed_updated_alert['receiver']).to eql(alert[:receiver])
expect(parsed_updated_alert['severity']).to eql(alert[:severity])
expect(parsed_updated_alert['featureViewName']).to eql(@feature_view["name"])
expect(parsed_updated_alert['featureViewVersion']).to eql(@feature_view["version"])
check_route_created_fm(@project, alert[:receiver], alert[:status], fv: @feature_view)
check_route_deleted_fm(@project, receiver_original, alert[:status], fv: @feature_view)
end
it "should cleanup receivers and routes when deleting project" do
delete_project(@project)
Expand All @@ -102,7 +162,7 @@
featuregroup = with_valid_fg(project)
create_fg_alerts_global(project, featuregroup)
get_fg_alerts(project, featuregroup)
expect(json_body[:count]).to eq(3)
expect(json_body[:count]).to eq(5)
alert_receiver = AlertReceiver.where("name LIKE '#{project[:projectname]}__%'")
expect(alert_receiver.length()).to eq(0)
get_routes_admin()
Expand Down Expand Up @@ -161,4 +221,4 @@
end
end
end
end
end
38 changes: 38 additions & 0 deletions hopsworks-IT/src/test/ruby/spec/featurestore_statistics_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@
commit_metadata = {commitDateString:20201025231125,commitTime:1603667485000,rowsInserted:4,rowsUpdated:0,rowsDeleted:0}
json_result = commit_cached_featuregroup(@project[:id], @featurestore_id, @stream_feature_group["id"], commit_metadata: commit_metadata)
expect_status_details(200)
# feature view - cached feature group - no time travel
json_result = create_feature_view_from_feature_group(@project[:id], @featurestore_id, @cached_feature_group)
expect_status_details(201)
@cached_feature_view = JSON.parse(json_result)
# feature view - stream feature group - time travel
json_result = create_feature_view_from_feature_group(@project[:id], @featurestore_id, @stream_feature_group)
expect_status_details(201)
@stream_feature_view = JSON.parse(json_result)
# training datasets - with and without splits
all_metadata = create_featureview_training_dataset_from_project(@project)
@training_dataset = all_metadata["response"]
Expand Down Expand Up @@ -99,6 +107,36 @@
# all query parameter combinations (including window times) are covered in the unit tests
end

# feature view - left feature group - no time travel (for feature monitoring)

it "should be able to add statistics as a commit to a feature view with a left feature group with time travel disabled (feature monitoring)" do
create_statistics_commit_fv(@project[:id], @featurestore_id, @cached_feature_view["name"], @cached_feature_view["version"])
expect_status_details(200)
end

it "should fail to add statistics as a commit to a feature view with a left feature group with time travel disabled and window times (feature monitoring)" do
create_statistics_commit_fv(@project[:id], @featurestore_id, @cached_feature_view["name"], @cached_feature_view["version"], computation_time: 1597903688010, window_start_commit_time: 1597903688000, window_end_commit_time: 1597903688010)
expect_status_details(400, error_code: 270229)
# all query parameter combinations (including window times) are covered in the unit tests
end

# feature view - left stream feature group - time travel enable (for feature monitoring)

it "should be able to add statistics as a commit to a feature view with a left stream feature group (feature monitoring)" do
create_statistics_commit_fv(@project[:id], @featurestore_id, @stream_feature_view["name"], @stream_feature_view["version"])
expect_status_details(200)
end

it "should be able to add statistics as a commit to a feature view with a left stream feature group with window times (feature monitoring)" do
# on two commits
create_statistics_commit_fv(@project[:id], @featurestore_id, @stream_feature_view["name"], @stream_feature_view["version"], computation_time: 1603667485000, window_start_commit_time: 1603577485000, window_end_commit_time: 1603667485000)
expect_status_details(200)
# on a single commit
create_statistics_commit_fv(@project[:id], @featurestore_id, @stream_feature_view["name"], @stream_feature_view["version"], computation_time: 1603667485000, window_start_commit_time: 1603667485000, window_end_commit_time: 1603667485000)
expect_status_details(200)
# all query parameter combinations (including window times) are covered in the unit tests
end

# training dataset

it "should be able to add statistics as a commit to a training dataset" do
Expand Down
31 changes: 30 additions & 1 deletion hopsworks-IT/src/test/ruby/spec/helpers/alert_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ def with_receivers(project)
pagerdutyConfigs: [create_pager_duty_config]))
create_receivers_checked(project, create_receiver(project, name: "#{project[:projectname]}__slack1",
slackConfigs: [create_slack_config]))
create_receivers_checked(project, create_receiver(project, name: "#{project[:projectname]}__pagerduty1",
pagerdutyConfigs: [create_pager_duty_config]))
create_receivers_checked(project, create_receiver(project, name: "#{project[:projectname]}__slack2",
slackConfigs: [create_slack_config]))
end

def get_alerts(project, query: "")
Expand Down Expand Up @@ -582,6 +586,16 @@ def create_route_match_query(project, receiver, status, job: nil, fg: nil)
return query
end

def create_fm_route_match_query(project, receiver, status, fg:nil, fv: nil)
query = "?match=status:#{status}"
query = receiver.start_with?("global-receiver__") ?
"#{query}&match=type:global-alert-#{receiver.partition('__')[2]}" :
"#{query}&match=project:#{project[:projectname]}&match=type:project-alert"
query = fv ? "#{query}&match=featureViewName:#{fv['name']}" : query
query = fg ? "#{query}&match=featureGroup:#{fg['name']}" : query
return query
end

def check_route_created(project, receiver, status, job: nil, fg: nil)
query = create_route_match_query(project, receiver, status, job: job, fg: fg)
get_routes_by_receiver(project, receiver, query: query)
Expand All @@ -603,4 +617,19 @@ def check_route_deleted(project, receiver, status, job: nil, fg: nil)
expect_status_details(400)
end

end
def check_route_created_fm(project, receiver, status, fg: nil, fv: nil)
query = create_fm_route_match_query(project, receiver, status, fg: fg, fv: fv)
get_routes_by_receiver(project, receiver, query:query)
expect_status_details(200)
check_backup_contains_route(json_body)
check_alert_receiver_created(receiver)
expect(json_body[:receiver]).to eq(receiver)
end

def check_route_deleted_fm(project, receiver, status, fg: nil, fv: nil)
query = create_fm_route_match_query(project, receiver, status, fg: fg, fv: fv)
get_routes_by_receiver(project, receiver, query: query)
expect_status_details(400)
end

end
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

module FeatureGroupAlertHelper
@@fg_alert_resource = "#{ENV['HOPSWORKS_API']}/project/%{projectId}/featurestores/%{fsId}/featuregroups/%{fgId}/alerts"

@@alert_success = {"status": "SUCCESS", "receiver": "global-receiver__email", "severity": "INFO"}
@@alert_warning = {"status": "WARNING", "receiver": "global-receiver__slack", "severity": "WARNING"}
@@alert_failed = {"status": "FAILURE", "receiver": "global-receiver__pagerduty", "severity": "CRITICAL"}

# feature monitor specific
@@featureview_alert_resource = "#{ENV['HOPSWORKS_API']}/project/%{projectId}/featurestores/%{fsId}/featureview/%{name}/version/%{version}/alerts"
@@fm_alert_success = {"status": "FEATURE_MONITOR_SHIFT_UNDETECTED", "receiver": "global-receiver__email", "severity":
"INFO"}
@@fm_alert_failed = {"status": "FEATURE_MONITOR_SHIFT_DETECTED", "receiver": "global-receiver__pagerduty", "severity": "CRITICAL"}
def get_fg_alert_success(project)
success = @@alert_success.clone
success[:receiver] = "#{project[:projectname]}__email"
Expand Down Expand Up @@ -80,11 +83,53 @@ def create_fg_alerts_global(project, featuregroup)
create_fg_alert(project, featuregroup, @@alert_success.clone)
create_fg_alert(project, featuregroup, @@alert_warning.clone)
create_fg_alert(project, featuregroup, @@alert_failed.clone)
create_fg_alert(project, featuregroup, @@fm_alert_failed.clone)
create_fg_alert(project, featuregroup, @@fm_alert_success.clone)
end

def with_valid_fg(project)
featurestore_id = get_featurestore_id(project[:id])
json_result, featuregroup_name = create_cached_featuregroup(project[:id], featurestore_id)
return JSON.parse(json_result)
end
end

def get_fm_alert_failure(project)
failed = @@fm_alert_failed.clone
failed[:receiver] = "#{project[:projectname]}__pagerduty"
return failed
end

def get_fm_alert_success(project)
success = @@fm_alert_success.clone
success[:receiver] = "#{project[:projectname]}__email"
return success
end

def create_fm_alerts(project, featuregroup, featureview)
create_fg_alert(project, featuregroup, get_fm_alert_failure(project))
create_feature_view_alert(project, featureview, get_fm_alert_failure(project))
end

def create_feature_view_alert(project, featureview, alert)
post "#{@@featureview_alert_resource}" % { projectId: project[:id],
fsId: featureview["featurestoreId"],
name: featureview["name"],
version: featureview["version"]}, alert.to_json
end

def update_featureview_alert(project, featureview, id, alert)
put "#{@@featureview_alert_resource}/#{id}" % { projectId: project[:id],
fsId: featureview["featurestoreId"],
name: featureview["name"],
version: featureview["version"]}, alert.to_json
end

def get_featureview_alerts(project, featureview)
get "#{@@featureview_alert_resource}" % {projectId: project[:id],
fsId: featureview["featurestoreId"],
name: featureview["name"],
version: featureview["version"]}
end


end
Loading

0 comments on commit 43a49f0

Please sign in to comment.