Skip to content

Commit

Permalink
Dataslice and StepMetrics: artifact and metadata pull and push relate…
Browse files Browse the repository at this point in the history
…d changes (#189)

* changes made for dataslice and metrics artifact and metadata pull/push

* resolved folder pull issue for local artifacts

* added Dataslice related changes required for minioS3 artifact repo

* making changes to fix artifact pull form amazonS3, local and minio repo

* made changes related to artifact pull for different artifact repos

* addressing review comments

* addressing review comments

* resolving some testing errors
  • Loading branch information
varkha-d-sharma committed Jul 25, 2024
1 parent 3046849 commit d1457ca
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 85 deletions.
4 changes: 4 additions & 0 deletions cmflib/bin/cmf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import re
import sys
from cmflib.cli import main

# this is temporary - need to remove after TripleDES warning goes away from paramiko
import warnings
warnings.filterwarnings(action='ignore', module='.*paramiko.*')

if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(main())
175 changes: 133 additions & 42 deletions cmflib/cmf.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,6 @@ def log_dataset(
Returns:
Artifact object from ML Metadata library associated with the new dataset artifact.
"""

# Assigning current file name as stage and execution name
current_script = sys.argv[0]
file_name = os.path.basename(current_script)
Expand Down Expand Up @@ -679,7 +678,7 @@ def log_dataset(

if c_hash == "":
print("Error in getting the dvc hash,return without logging")
return null
return

dataset_commit = c_hash
dvc_url = dvc_get_url(url)
Expand Down Expand Up @@ -1033,7 +1032,7 @@ def log_model(

if c_hash == "":
print("Error in getting the dvc hash,return without logging")
return null
return

model_commit = c_hash

Expand Down Expand Up @@ -1478,29 +1477,53 @@ def commit_metrics(self, metrics_name: str):
Returns:
Artifact object from the ML Protocol Buffers library associated with the new metrics artifact.
"""

logging_dir = change_dir(self.cmf_init_path)
# code for nano cmf
# Assigning current file name as stage and execution name
current_script = sys.argv[0]
file_name = os.path.basename(current_script)
name_without_extension = os.path.splitext(file_name)[0]
# create context if not already created
if not self.child_context:
self.create_context(pipeline_stage=name_without_extension)
assert self.child_context is not None, f"Failed to create context for {self.pipeline_name}!!"

# create execution if not already created
if not self.execution:
self.create_execution(execution_type=name_without_extension)
assert self.execution is not None, f"Failed to create execution for {self.pipeline_name}!!"


directory_path = os.path.join( "cmf_artifacts/metrics",self.execution.properties["Execution_uuid"].string_value)
os.makedirs(directory_path, exist_ok=True)
metrics_df = pd.DataFrame.from_dict(
self.metrics[metrics_name], orient="index")
metrics_df.index.names = ["SequenceNumber"]
metrics_df.to_parquet(metrics_name)
commit_output(metrics_name, self.execution.id)
uri = dvc_get_hash(metrics_name)
metrics_path = os.path.join(directory_path,metrics_name)
metrics_df.to_parquet(metrics_path)
commit_output(metrics_path, self.execution.id)
uri = dvc_get_hash(metrics_path)

if uri == "":
print("Error in getting the dvc hash,return without logging")
return null
return
metrics_commit = uri
dvc_url = dvc_get_url(metrics_path)
dvc_url_with_pipeline = f"{self.parent_context.name}:{dvc_url}"
name = (
metrics_name
metrics_path
+ ":"
+ uri
+ ":"
+ str(self.execution.id)
+ ":"
+ str(uuid.uuid1())
)
# passing uri value to commit
custom_props = {"Name": metrics_name, "Commit": metrics_commit}
# not needed as property 'name' is part of artifact
# to maintain uniformity - Commit goes propeties of the artifact
# custom_props = {"Name": metrics_name, "Commit": metrics_commit}
custom_props = {}
metrics = create_new_artifact_event_and_attribution(
store=self.store,
execution_id=self.execution.id,
Expand All @@ -1509,6 +1532,15 @@ def commit_metrics(self, metrics_name: str):
name=name,
type_name="Step_Metrics",
event_type=mlpb.Event.Type.OUTPUT,
properties={
# passing uri value to commit
"Commit": metrics_commit,
"url": str(dvc_url_with_pipeline),
},
artifact_type_properties={
"Commit": mlpb.STRING,
"url": mlpb.STRING,
},
custom_properties=custom_props,
milliseconds_since_epoch=int(time.time() * 1000),
)
Expand Down Expand Up @@ -1537,20 +1569,20 @@ def commit_metrics(self, metrics_name: str):
os.chdir(logging_dir)
return metrics

def commit_existing_metrics(self, metrics_name: str, uri: str, custom_properties: t.Optional[t.Dict] = None):
"""
Commits existing metrics associated with the given URI to MLMD.
Example:
```python
artifact: mlpb.Artifact = cmf.commit_existing_metrics("existing_metrics", "abc123",
{"custom_key": "custom_value"})
```
Args:
metrics_name: Name of the metrics.
uri: Unique identifier associated with the metrics.
custom_properties: Optional custom properties for the metrics.
def commit_existing_metrics(self, metrics_name: str, uri: str, props: t.Optional[t.Dict] = None, custom_properties: t.Optional[t.Dict] = None):
"""
Commits existing metrics associated with the given URI to MLMD.
Example:
```python
artifact: mlpb.Artifact = cmf.commit_existing_metrics("existing_metrics", "abc123",
{"custom_key": "custom_value"})
```
Args:
metrics_name: Name of the metrics.
uri: Unique identifier associated with the metrics.
custom_properties: Optional custom properties for the metrics.
Returns:
Artifact object from the ML Protocol Buffers library associated with the existing metrics artifact.
Artifact object from the ML Protocol Buffers library associated with the existing metrics artifact.
"""

custom_props = {} if custom_properties is None else custom_properties
Expand All @@ -1575,6 +1607,15 @@ def commit_existing_metrics(self, metrics_name: str, uri: str, custom_properties
name=metrics_name,
type_name="Step_Metrics",
event_type=mlpb.Event.Type.OUTPUT,
properties={
# passing uri value to commit
"Commit": props.get("Commit", ""),
"url": props.get("url", ""),
},
artifact_type_properties={
"Commit": mlpb.STRING,
"url": mlpb.STRING,
},
custom_properties=custom_props,
milliseconds_since_epoch=int(time.time() * 1000),
)
Expand Down Expand Up @@ -1680,6 +1721,8 @@ def create_dataslice(self, name: str) -> "Cmf.DataSlice":
def read_dataslice(self, name: str) -> pd.DataFrame:
"""Reads the dataslice"""
# To do checkout if not there
directory_path = os.path.join("cmf_artifacts/dataslices",self.execution.properties["Execution_uuid"].string_value)
name = os.path.join(directory_path, name)
df = pd.read_parquet(name)
return df

Expand All @@ -1700,6 +1743,8 @@ def update_dataslice(self, name: str, record: str, custom_properties: t.Dict):
Returns:
None
"""
directory_path = os.path.join("cmf_artifacts/dataslices", self.execution.properties["Execution_uuid"].string_value)
name = os.path.join(directory_path, name)
df = pd.read_parquet(name)
temp_dict = df.to_dict("index")
temp_dict[record].update(custom_properties)
Expand Down Expand Up @@ -1739,7 +1784,7 @@ def add_data(
"""

self.props[path] = {}
# self.props[path]['hash'] = dvc_get_hash(path)
self.props[path]['hash'] = dvc_get_hash(path)
parent_path = path.rsplit("/", 1)[0]
self.data_parent = parent_path.rsplit("/", 1)[1]
if custom_properties:
Expand All @@ -1765,71 +1810,107 @@ def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None:
custom_properties: Dictionary to store key value pairs associated with Dataslice
Example{"mean":2.5, "median":2.6}
"""

logging_dir = change_dir(self.writer.cmf_init_path)
# code for nano cmf
# Assigning current file name as stage and execution name
current_script = sys.argv[0]
file_name = os.path.basename(current_script)
name_without_extension = os.path.splitext(file_name)[0]
# create context if not already created
if not self.writer.child_context:
self.writer.create_context(pipeline_stage=name_without_extension)
assert self.writer.child_context is not None, f"Failed to create context for {self.pipeline_name}!!"

# create execution if not already created
if not self.writer.execution:
self.writer.create_execution(execution_type=name_without_extension)
assert self.writer.execution is not None, f"Failed to create execution for {self.pipeline_name}!!"

directory_path = os.path.join( "cmf_artifacts/dataslices",self.writer.execution.properties["Execution_uuid"].string_value)
os.makedirs(directory_path, exist_ok=True)
custom_props = {} if custom_properties is None else custom_properties
git_repo = git_get_repo()
dataslice_df = pd.DataFrame.from_dict(self.props, orient="index")
dataslice_df.index.names = ["Path"]
dataslice_df.to_parquet(self.name)
dataslice_path = os.path.join(directory_path,self.name)
dataslice_df.to_parquet(dataslice_path)
existing_artifact = []

commit_output(self.name, self.writer.execution.id)
c_hash = dvc_get_hash(self.name)
commit_output(dataslice_path, self.writer.execution.id)
c_hash = dvc_get_hash(dataslice_path)
if c_hash == "":
print("Error in getting the dvc hash,return without logging")
return null
return

dataslice_commit = c_hash
remote = dvc_get_url(self.name)
url = dvc_get_url(dataslice_path)
dvc_url_with_pipeline = f"{self.writer.parent_context.name}:{url}"
if c_hash and c_hash.strip():
existing_artifact.extend(
self.writer.store.get_artifacts_by_uri(c_hash))
if existing_artifact and len(existing_artifact) != 0:
print("Adding to existing data slice")
# Haven't added event type in this if cond, is it not needed??
slice = link_execution_to_input_artifact(
store=self.writer.store,
execution_id=self.writer.execution.id,
uri=c_hash,
input_name=self.name + ":" + c_hash,
input_name=dataslice_path + ":" + c_hash,
)
else:
props = {
"Commit": dataslice_commit, # passing c_hash value to commit
"git_repo": git_repo,
"Remote": remote,
}
props.update(custom_props)
props={
"git_repo": str(git_repo),
# passing c_hash value to commit
"Commit": str(dataslice_commit),
"url": str(dvc_url_with_pipeline),
},
slice = create_new_artifact_event_and_attribution(
store=self.writer.store,
execution_id=self.writer.execution.id,
context_id=self.writer.child_context.id,
uri=c_hash,
name=self.name + ":" + c_hash,
name=dataslice_path + ":" + c_hash,
type_name="Dataslice",
event_type=mlpb.Event.Type.OUTPUT,
custom_properties=props,
properties={
"git_repo": str(git_repo),
# passing c_hash value to commit
"Commit": str(dataslice_commit),
"url": str(dvc_url_with_pipeline),
},
artifact_type_properties={
"git_repo": mlpb.STRING,
"Commit": mlpb.STRING,
"url": mlpb.STRING,
},
custom_properties=custom_props,
milliseconds_since_epoch=int(time.time() * 1000),
)
if self.writer.graph:
self.writer.driver.create_dataslice_node(
self.name, self.name + ":" + c_hash, c_hash, self.data_parent, props
self.name, dataslice_path + ":" + c_hash, c_hash, self.data_parent, props
)
os.chdir(logging_dir)
return slice

def commit_existing(self, uri: str, custom_properties: t.Optional[t.Dict] = None) -> None:
# commit existing dataslice to server
def commit_existing(self, uri: str, props: t.Optional[t.Dict] = None, custom_properties: t.Optional[t.Dict] = None) -> None:
custom_props = {} if custom_properties is None else custom_properties
c_hash = uri
c_hash = uri.strip()
dataslice_commit = c_hash
existing_artifact = []
if c_hash and c_hash.strip():
existing_artifact.extend(
self.writer.store.get_artifacts_by_uri(c_hash))
if existing_artifact and len(existing_artifact) != 0:
print("Adding to existing data slice")
# Haven't added event type in this if cond, is it not needed??
slice = link_execution_to_input_artifact(
store=self.writer.store,
execution_id=self.writer.execution.id,
uri=c_hash,
input_name=self.name
input_name=self.name,
)
else:
slice = create_new_artifact_event_and_attribution(
Expand All @@ -1840,6 +1921,16 @@ def commit_existing(self, uri: str, custom_properties: t.Optional[t.Dict] = None
name=self.name,
type_name="Dataslice",
event_type=mlpb.Event.Type.OUTPUT,
properties={
"git_repo": props.get("git_repo", ""),
"Commit": props.get("Commit", ""),
"url": props.get("url", " "),
},
artifact_type_properties={
"git_repo": mlpb.STRING,
"Commit": mlpb.STRING,
"url": mlpb.STRING,
},
custom_properties=custom_properties,
milliseconds_since_epoch=int(time.time() * 1000),
)
Expand Down
4 changes: 2 additions & 2 deletions cmflib/cmf_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ def parse_json_to_mlmd(mlmd_json, path_to_store, cmd, exec_id):
cmf_class.log_execution_metrics_from_client(event["artifact"]["name"], custom_props)
elif artifact_type == "Dataslice":
dataslice = cmf_class.create_dataslice(event["artifact"]["name"])
dataslice.commit_existing(uri, custom_props)
dataslice.commit_existing(uri, props, custom_props)
elif artifact_type == "Step_Metrics":
cmf_class.commit_existing_metrics(event["artifact"]["name"], uri, custom_props)
cmf_class.commit_existing_metrics(event["artifact"]["name"], uri, props, custom_props)
else:
pass

Expand Down
4 changes: 2 additions & 2 deletions cmflib/commands/artifact/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def run(self):
cmf_config=CmfConfig.read_config(cmf_config_file)
out_msg = check_minio_server(dvc_config_op)
if dvc_config_op["core.remote"] == "minio" and out_msg != "SUCCESS":
return out_msg
return "MinioS3 server failed to start!!!"
if dvc_config_op["core.remote"] == "osdf":
#print("key_id="+cmf_config["osdf-key_id"])
dynamic_password = generate_osdf_token(cmf_config["osdf-key_id"],cmf_config["osdf-key_path"],cmf_config["osdf-key_issuer"])
Expand Down Expand Up @@ -100,7 +100,7 @@ def run(self):
file_set = set(names)
result = dvc_push(list(file_set))
return result

def add_parser(subparsers, parent_parser):
HELP = "Push artifacts to the user configured artifact repo."

Expand Down
Loading

0 comments on commit d1457ca

Please sign in to comment.