Skip to content

Commit

Permalink
Adding cmf init path (#177)
Browse files Browse the repository at this point in the history
* adding cmf_init_path

* update

* revert docker compose file

* removed debugging lines

* update

---------

Co-authored-by: Abhinav Chobey <[email protected]>
  • Loading branch information
abhinavchobey and Abhinav Chobey authored May 30, 2024
1 parent 8ecdd7e commit c56ed92
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 35 deletions.
2 changes: 1 addition & 1 deletion cmflib/bin/cmf_dvc_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def find_location(string, elements):


pipeline_name = "Pipeline"+"-"+str(uuid_) if not pipeline_name else pipeline_name
metawriter = cmf.Cmf(filename="mlmd", pipeline_name=pipeline_name, graph=True)
metawriter = cmf.Cmf(filepath = "mlmd", pipeline_name=pipeline_name, graph=True)

"""
Parse the dvc.lock dictionary and get the command section
Expand Down
100 changes: 67 additions & 33 deletions cmflib/cmf.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
link_execution_to_input_artifact,
)
from cmflib.utils.cmf_config import CmfConfig
from cmflib.utils.helper_functions import get_python_env
from cmflib.utils.helper_functions import get_python_env, change_dir
from cmflib.cmf_commands_wrapper import (
_metadata_push,
_metadata_pull,
Expand All @@ -75,14 +75,14 @@ class Cmf:
The user has to provide the name of the pipeline, that needs to be recorded with CMF.
```python
cmflib.cmf.Cmf(
filename="mlmd",
filepath="mlmd",
pipeline_name="test_pipeline",
custom_properties={"owner": "user_a"},
graph=False
)
```
Args:
filename: Path to the sqlite file to store the metadata
filepath: Path to the sqlite file to store the metadata
pipeline_name: Name to uniquely identify the pipeline.
Note that name is the unique identifier for a pipeline.
If a pipeline already exist with the same name, the existing pipeline object is reused.
Expand All @@ -108,12 +108,18 @@ class Cmf:

def __init__(
self,
filename: str = "mlmd",
filepath: str = "mlmd",
pipeline_name: str = "",
custom_properties: t.Optional[t.Dict] = None,
graph: bool = False,
is_server: bool = False,
):
#path to directory
self.cmf_init_path = filepath.rsplit("/",1)[0] \
if len(filepath.rsplit("/",1)) > 1 \
else os.getcwd()

logging_dir = change_dir(self.cmf_init_path)
if is_server is False:
Cmf.__prechecks()
if custom_properties is None:
Expand All @@ -123,9 +129,9 @@ def __init__(
cur_folder = os.path.basename(os.getcwd())
pipeline_name = cur_folder
config = mlpb.ConnectionConfig()
config.sqlite.filename_uri = filename
config.sqlite.filename_uri = filepath
self.store = metadata_store.MetadataStore(config)
self.filename = filename
self.filepath = filepath
self.child_context = None
self.execution = None
self.execution_name = ""
Expand All @@ -134,7 +140,8 @@ def __init__(
self.input_artifacts = []
self.execution_label_props = {}
self.graph = graph
self.branch_name = filename.rsplit("/", 1)[-1]
#last token in filepath
self.branch_name = filepath.rsplit("/", 1)[-1]

if is_server is False:
git_checkout_new_branch(self.branch_name)
Expand All @@ -153,6 +160,7 @@ def __init__(
self.driver.create_pipeline_node(
pipeline_name, self.parent_context.id, custom_properties
)
os.chdir(logging_dir)

@staticmethod
def __load_neo4j_params():
Expand Down Expand Up @@ -231,7 +239,7 @@ def create_context(
from cmflib.cmf import Cmf
from ml_metadata.proto import metadata_store_pb2 as mlpb
# Create CMF logger
cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline")
cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline")
# Create context
context: mlmd.proto.Context = cmf.create_context(
pipeline_stage="prepare",
Expand Down Expand Up @@ -274,7 +282,7 @@ def merge_created_context(
from cmflib.cmf import Cmf
from ml_metadata.proto import metadata_store_pb2 as mlpb
# Create CMF logger
cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline")
cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline")
# Create context
context: mlmd.proto.Context = cmf.merge_created_context(
pipeline_stage="Test-env/prepare",
Expand Down Expand Up @@ -317,7 +325,7 @@ def create_execution(
from cmflib.cmf import Cmf
from ml_metadata.proto import metadata_store_pb2 as mlpb
# Create CMF logger
cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline")
cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline")
# Create or reuse context for this stage
context: mlmd.proto.Context = cmf.create_context(
pipeline_stage="prepare",
Expand Down Expand Up @@ -358,6 +366,8 @@ def create_execution(
assert self.child_context is not None, f"Failed to create context for {self.pipeline_name}!!"

# Initializing the execution related fields

logging_dir = change_dir(self.cmf_init_path)
self.metrics = {}
self.input_artifacts = []
self.execution_label_props = {}
Expand Down Expand Up @@ -408,7 +418,7 @@ def create_execution(
self.execution.id,
custom_props,
)

os.chdir(logging_dir)
return self.execution

def update_execution(
Expand All @@ -423,7 +433,7 @@ def update_execution(
from cmflib.cmf import Cmf
from ml_metadata.proto import metadata_store_pb2 as mlpb
# Create CMF logger
cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline")
cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline")
# Update a execution
execution: mlmd.proto.Execution = cmf.update_execution(
execution_id=8,
Expand Down Expand Up @@ -505,7 +515,7 @@ def merge_created_execution(
from cmflib.cmf import Cmf
from ml_metadata.proto import metadata_store_pb2 as mlpb
# Create CMF logger
cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline")
cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline")
# Create or reuse context for this stage
context: mlmd.proto.Context = cmf.merge_created_context(
pipeline_stage="prepare",
Expand Down Expand Up @@ -655,6 +665,7 @@ def log_dataset(
# If the dataset already exist , then we just link the existing dataset to the execution
# We do not update the dataset properties .
# We need to append the new properties to the existing dataset properties
logging_dir = change_dir(self.cmf_init_path)
custom_props = {} if custom_properties is None else custom_properties
git_repo = git_get_repo()
name = re.split("/", url)[-1]
Expand All @@ -665,6 +676,11 @@ def log_dataset(

commit_output(url, self.execution.id)
c_hash = dvc_get_hash(url)

if c_hash == "":
print("Error in getting the dvc hash,return without logging")
return null

dataset_commit = c_hash
dvc_url = dvc_get_url(url)
dvc_url_with_pipeline = f"{self.parent_context.name}:{dvc_url}"
Expand Down Expand Up @@ -761,6 +777,7 @@ def log_dataset(
self.driver.create_artifact_relationships(
self.input_artifacts, child_artifact, self.execution_label_props
)
os.chdir(logging_dir)
return artifact

def update_dataset_url(self, artifact: mlpb.Artifact, updated_url: str):
Expand Down Expand Up @@ -1001,7 +1018,7 @@ def log_model(
# If the model already exist , then we just link the existing model to the execution
# We do not update the model properties .
# We need to append the new properties to the existing model properties

logging_dir = change_dir(self.cmf_init_path)
if custom_properties is None:
custom_properties = {}
custom_props = {} if custom_properties is None else custom_properties
Expand All @@ -1013,6 +1030,11 @@ def log_model(

commit_output(path, self.execution.id)
c_hash = dvc_get_hash(path)

if c_hash == "":
print("Error in getting the dvc hash,return without logging")
return null

model_commit = c_hash

# If connecting to an existing artifact - The name of the artifact is
Expand Down Expand Up @@ -1114,7 +1136,7 @@ def log_model(
self.driver.create_artifact_relationships(
self.input_artifacts, child_artifact, self.execution_label_props
)

os.chdir(logging_dir)
return artifact

# Add the model to dvc do a git commit and store the commit id in MLMD
Expand Down Expand Up @@ -1357,7 +1379,6 @@ def log_execution_metrics(
Returns:
Artifact object from ML Metadata library associated with the new coarse-grained metrics artifact.
"""

# Assigning current file name as stage and execution name
current_script = sys.argv[0]
file_name = os.path.basename(current_script)
Expand All @@ -1373,6 +1394,7 @@ def log_execution_metrics(
assert self.execution is not None, f"Failed to create execution for {self.pipeline_name}!!"


logging_dir = change_dir(self.cmf_init_path)
custom_props = {} if custom_properties is None else custom_properties
uri = str(uuid.uuid1())
metrics_name = metrics_name + ":" + uri + ":" + str(self.execution.id)
Expand Down Expand Up @@ -1412,6 +1434,7 @@ def log_execution_metrics(
self.driver.create_artifact_relationships(
self.input_artifacts, child_artifact, self.execution_label_props
)
os.chdir(logging_dir)
return metrics

def log_metric(
Expand Down Expand Up @@ -1455,12 +1478,17 @@ def commit_metrics(self, metrics_name: str):
Returns:
Artifact object from the ML Protocol Buffers library associated with the new metrics artifact.
"""
logging_dir = change_dir(self.cmf_init_path)
metrics_df = pd.DataFrame.from_dict(
self.metrics[metrics_name], orient="index")
metrics_df.index.names = ["SequenceNumber"]
metrics_df.to_parquet(metrics_name)
commit_output(metrics_name, self.execution.id)
uri = dvc_get_hash(metrics_name)

if uri == "":
print("Error in getting the dvc hash,return without logging")
return null
metrics_commit = uri
name = (
metrics_name
Expand Down Expand Up @@ -1505,6 +1533,8 @@ def commit_metrics(self, metrics_name: str):
self.driver.create_artifact_relationships(
self.input_artifacts, child_artifact, self.execution_label_props
)

os.chdir(logging_dir)
return metrics

def commit_existing_metrics(self, metrics_name: str, uri: str, custom_properties: t.Optional[t.Dict] = None):
Expand Down Expand Up @@ -1574,7 +1604,7 @@ def commit_existing_metrics(self, metrics_name: str, uri: str, custom_properties

def log_validation_output(
self, version: str, custom_properties: t.Optional[t.Dict] = None
) -> object:
) -> object:
uri = str(uuid.uuid1())
return create_new_artifact_event_and_attribution(
store=self.store,
Expand Down Expand Up @@ -1744,6 +1774,10 @@ def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None:

commit_output(self.name, self.writer.execution.id)
c_hash = dvc_get_hash(self.name)
if c_hash == "":
print("Error in getting the dvc hash,return without logging")
return null

dataslice_commit = c_hash
remote = dvc_get_url(self.name)
if c_hash and c_hash.strip():
Expand Down Expand Up @@ -1828,44 +1862,44 @@ def commit_existing(self, uri: str, custom_properties: t.Optional[t.Dict] = None
# print(last)
# os.symlink(str(index), slicedir + "/ " + last)

def metadata_push(pipeline_name,filename,execution_id: str = ""):
def metadata_push(pipeline_name,filepath,execution_id: str = ""):
""" Pushes MLMD file to CMF-server.
Example:
```python
result = metadata_push("example_pipeline", "mlmd_file", "3")
```
Args:
pipeline_name: Name of the pipeline.
filename: Path to the MLMD file.
filepath: Path to the MLMD file.
execution_id: Optional execution ID.
Returns:
Response output from the _metadata_push function.
"""
# Required arguments: pipeline_name, filename (mlmd file path)
# Required arguments: pipeline_name, filepath (mlmd file path)
#Optional arguments: Execution_ID
output = _metadata_push(pipeline_name,filename, execution_id)
output = _metadata_push(pipeline_name,filepath, execution_id)
return output

def metadata_pull(pipeline_name,filename ="./mlmd", execution_id: str = ""):
def metadata_pull(pipeline_name,filepath ="./mlmd", execution_id: str = ""):
""" Pulls MLMD file from CMF-server.
Example:
```python
result = metadata_pull("example_pipeline", "./mlmd_directory", "execution_123")
```
Args:
pipeline_name: Name of the pipeline.
filename: File path to store the MLMD file.
filepath: File path to store the MLMD file.
execution_id: Optional execution ID.
Returns:
Message from the _metadata_pull function.
"""
# Required arguments: pipeline_name, filename(file path to store mlmd file)
# Required arguments: pipeline_name, filepath(file path to store mlmd file)
#Optional arguments: Execution_ID
output = _metadata_pull(pipeline_name,filename, execution_id)
output = _metadata_pull(pipeline_name,filepath, execution_id)
return output

def artifact_pull(pipeline_name,filename="./mlmd"):
def artifact_pull(pipeline_name,filepath="./mlmd"):
""" Pulls artifacts from the initialized repository.
Example:
Expand All @@ -1875,34 +1909,34 @@ def artifact_pull(pipeline_name,filename="./mlmd"):
Args:
pipeline_name: Name of the pipeline.
filename: Path to store artifacts.
filepath: Path to store artifacts.
Returns:
Output from the _artifact_pull function.
"""

# Required arguments: Pipeline_name
# Optional arguments: filename( path to store artifacts)
output = _artifact_pull(pipeline_name,filename)
# Optional arguments: filepath( path to store artifacts)
output = _artifact_pull(pipeline_name,filepath)
return output

def artifact_pull_single(pipeline_name,filename,artifact_name):
def artifact_pull_single(pipeline_name,filepath,artifact_name):
""" Pulls a single artifact from the initialized repository.
Example:
```python
result = artifact_pull_single("example_pipeline", "./mlmd_directory", "example_artifact")
```
Args:
pipeline_name: Name of the pipeline.
filename: Path to store the artifact.
filepath: Path to store the artifact.
artifact_name: Name of the artifact.
Returns:
Output from the _artifact_pull_single function.
"""

# Required arguments: Pipeline_name
# Optional arguments: filename( path to store artifacts)
output = _artifact_pull_single(pipeline_name,filename,artifact_name)
# Optional arguments: filepath( path to store artifacts)
output = _artifact_pull_single(pipeline_name,filepath,artifact_name)
return output

def artifact_push():
Expand Down
2 changes: 1 addition & 1 deletion cmflib/cmf_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def parse_json_to_mlmd(mlmd_json, path_to_store, cmd, exec_id):
graph = False
if os.getenv('NEO4J_URI', "") != "":
graph = True
cmf_class = cmf.Cmf(filename=path_to_store, pipeline_name=pipeline_name,
cmf_class = cmf.Cmf(filepath=path_to_store, pipeline_name=pipeline_name,
graph=graph, is_server=True)
for stage in data["Pipeline"][0]["stages"]: # Iterates over all the stages
if exec_id is None:
Expand Down
6 changes: 6 additions & 0 deletions cmflib/utils/helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ def get_python_env()-> str:
print("Pip is not installed.")
return packages

def change_dir(cmf_init_path):
logging_dir = os.getcwd()
if not logging_dir == cmf_init_path:
os.chdir(cmf_init_path)
return logging_dir

def is_conda_installed():
try:
import conda
Expand Down

0 comments on commit c56ed92

Please sign in to comment.