Skip to content

Commit

Permalink
Support loading of folder of metrics collector files
Browse files Browse the repository at this point in the history
As suggested in the PR review, the generic case where multiple KFP pipeline
metrics files would be present in the output folder is supported.

Note that in the current KFP v1 implementation always only one data
file is present.
  • Loading branch information
votti committed Jun 21, 2023
1 parent 9fc8ab3 commit c7aa442
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 41 deletions.
10 changes: 1 addition & 9 deletions cmd/metricscollector/v1beta1/kfp-metricscollector/v1/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,6 @@ def parse_options():
parser.add_argument(
"-w", "--wait_all_processes", type=str, default=const.DEFAULT_WAIT_ALL_PROCESSES
)
parser.add_argument(
"-fn",
"--metrics_file_name",
type=str,
default=const.DEFAULT_METRICS_FILE_KFPV1_FILE,
)

opt = parser.parse_args()
return opt

Expand Down Expand Up @@ -86,8 +79,7 @@ def parse_options():
)

mc = MetricsCollector(opt.metric_names.split(";"))
metrics_file = os.path.join(opt.metrics_file_dir, opt.metrics_file_name)
observation_log = mc.parse_file(metrics_file)
observation_log = mc.parse_file(opt.metrics_file_dir)

channel = grpc.beta.implementations.insecure_channel(
db_manager_server[0], int(db_manager_server[1])
Expand Down
3 changes: 1 addition & 2 deletions pkg/metricscollector/v1beta1/common/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
DEFAULT_WAIT_ALL_PROCESSES = "True"
# Default value for directory where TF event metrics are reported
DEFAULT_METRICS_FILE_DIR = "/log"
# Default value for directory where TF event metrics are reported
# Default value for directory where Kubeflow pipeline metrics are reported
DEFAULT_METRICS_FILE_KFPV1_DIR = "/tmp/outputs/mlpipeline_metrics"
DEFAULT_METRICS_FILE_KFPV1_FILE = "data"
# Job finished marker in $$$$.pid file when main process is completed
TRAINING_COMPLETED = "completed"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,53 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# TFEventFileParser parses tfevent files and returns an ObservationLog of the metrics specified.
# When the event file is under a directory(e.g. test dir), please specify "{{dirname}}/{{metrics name}}"
# For example, in the Tensorflow MNIST Classification With Summaries:
# https://github.com/kubeflow/katib/blob/master/examples/v1beta1/trial-images/tf-mnist-with-summaries/mnist.py.
# The "accuracy" and "loss" metric is saved under "train" and "test" directories.
# So in the Metrics Collector specification, please specify name of "train" or "test" directory.
# Check TFJob example for more information:
# https://github.com/kubeflow/katib/blob/master/examples/v1beta1/kubeflow-training-operator/tfjob-mnist-with-summaries.yaml#L16-L22
# The Kubeflow pipeline metrics collector KFPMetricParser parses the metrics file
# and returns an ObservationLog of the metrics specified.
# Some documentation on the metrics collector file structure can be found here:
# https://v0-6.kubeflow.org/docs/pipelines/sdk/pipelines-metrics/

from datetime import datetime
from logging import getLogger, StreamHandler, INFO
import os
from typing import List
import json

import rfc3339
import api_pb2
from pkg.metricscollector.v1beta1.common import const

class KFPMetricParser:
def __init__(self, metric_names):
self.metric_names = metric_names

def parse_metrics(fn: str) -> List[api_pb2.MetricLog]:
"""Parse a kubeflow pipeline metrics file
@staticmethod
def find_all_files(directory):
for root, dirs, files in os.walk(directory):
for f in files:
yield os.path.join(root, f)

Args:
fn (function): path to metrics file
def parse_metrics(self, fn: str) -> List[api_pb2.MetricLog]:
"""Parse a kubeflow pipeline metrics file
Returns:
List[api_pb2.MetricLog]: A list of logged metrics
"""
metrics = []
with open(fn) as f:
metrics_dict = json.load(f)
for m in metrics_dict["metrics"]:
name = m["name"]
value = m["numberValue"]
ml = api_pb2.MetricLog(
time_stamp=rfc3339.rfc3339(datetime.now()),
metric=api_pb2.Metric(name=name, value=str(value)),
)
metrics.append(ml)
return metrics
Args:
fn (function): path to metrics file
Returns:
List[api_pb2.MetricLog]: A list of logged metrics
"""
metrics = []
with open(fn) as f:
metrics_dict = json.load(f)
for m in metrics_dict["metrics"]:
name = m["name"]
value = m["numberValue"]
if name in self.metric_names:
ml = api_pb2.MetricLog(
time_stamp=rfc3339.rfc3339(datetime.now()),
metric=api_pb2.Metric(name=name, value=str(value)),
)
metrics.append(ml)
return metrics

class MetricsCollector:
def __init__(self, metric_names):
Expand All @@ -63,10 +69,20 @@ def __init__(self, metric_names):
self.logger.addHandler(handler)
self.logger.propagate = False
self.metrics = metric_names
self.parser = KFPMetricParser(metric_names)

def parse_file(self, filename):
self.logger.info(filename + " will be parsed.")
mls = parse_metrics(filename)
def parse_file(self, directory):
"""Parses the Kubeflow Pipeline metrics files"""
mls = []
for f in self.parser.find_all_files(directory):
if os.path.isdir(f):
continue
try:
self.logger.info(f + " will be parsed.")
mls.extend(self.parser.parse_metrics(f))
except Exception as e:
self.logger.warning("Unexpected error: " + str(e))
continue

# Metrics logs must contain at least one objective metric value
# Objective metric is located at first index
Expand Down

0 comments on commit c7aa442

Please sign in to comment.