diff --git a/cmflib/bin/cmf_dvc_ingest.py b/cmflib/bin/cmf_dvc_ingest.py index 459c84ee..ff8bc3c1 100644 --- a/cmflib/bin/cmf_dvc_ingest.py +++ b/cmflib/bin/cmf_dvc_ingest.py @@ -148,7 +148,7 @@ def find_location(string, elements): pipeline_name = "Pipeline"+"-"+str(uuid_) if not pipeline_name else pipeline_name -metawriter = cmf.Cmf(filename="mlmd", pipeline_name=pipeline_name, graph=True) +metawriter = cmf.Cmf(filepath = "mlmd", pipeline_name=pipeline_name, graph=True) """ Parse the dvc.lock dictionary and get the command section diff --git a/cmflib/cmf.py b/cmflib/cmf.py index 18e7943f..4b795bb5 100644 --- a/cmflib/cmf.py +++ b/cmflib/cmf.py @@ -53,7 +53,7 @@ link_execution_to_input_artifact, ) from cmflib.utils.cmf_config import CmfConfig -from cmflib.utils.helper_functions import get_python_env +from cmflib.utils.helper_functions import get_python_env, change_dir from cmflib.cmf_commands_wrapper import ( _metadata_push, _metadata_pull, @@ -75,14 +75,14 @@ class Cmf: The user has to provide the name of the pipeline, that needs to be recorded with CMF. ```python cmflib.cmf.Cmf( - filename="mlmd", + filepath="mlmd", pipeline_name="test_pipeline", custom_properties={"owner": "user_a"}, graph=False ) ``` Args: - filename: Path to the sqlite file to store the metadata + filepath: Path to the sqlite file to store the metadata pipeline_name: Name to uniquely identify the pipeline. Note that name is the unique identifier for a pipeline. If a pipeline already exist with the same name, the existing pipeline object is reused. @@ -108,12 +108,18 @@ class Cmf: def __init__( self, - filename: str = "mlmd", + filepath: str = "mlmd", pipeline_name: str = "", custom_properties: t.Optional[t.Dict] = None, graph: bool = False, is_server: bool = False, ): + #path to directory + self.cmf_init_path = filepath.rsplit("/",1)[0] \ + if len(filepath.rsplit("/",1)) > 1 \ + else os.getcwd() + + logging_dir = change_dir(self.cmf_init_path) if is_server is False: Cmf.__prechecks() if custom_properties is None: @@ -123,9 +129,9 @@ def __init__( cur_folder = os.path.basename(os.getcwd()) pipeline_name = cur_folder config = mlpb.ConnectionConfig() - config.sqlite.filename_uri = filename + config.sqlite.filename_uri = filepath self.store = metadata_store.MetadataStore(config) - self.filename = filename + self.filepath = filepath self.child_context = None self.execution = None self.execution_name = "" @@ -134,7 +140,8 @@ def __init__( self.input_artifacts = [] self.execution_label_props = {} self.graph = graph - self.branch_name = filename.rsplit("/", 1)[-1] + #last token in filepath + self.branch_name = filepath.rsplit("/", 1)[-1] if is_server is False: git_checkout_new_branch(self.branch_name) @@ -153,6 +160,7 @@ def __init__( self.driver.create_pipeline_node( pipeline_name, self.parent_context.id, custom_properties ) + os.chdir(logging_dir) @staticmethod def __load_neo4j_params(): @@ -231,7 +239,7 @@ def create_context( from cmflib.cmf import Cmf from ml_metadata.proto import metadata_store_pb2 as mlpb # Create CMF logger - cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline") + cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline") # Create context context: mlmd.proto.Context = cmf.create_context( pipeline_stage="prepare", @@ -274,7 +282,7 @@ def merge_created_context( from cmflib.cmf import Cmf from ml_metadata.proto import metadata_store_pb2 as mlpb # Create CMF logger - cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline") + cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline") # Create context context: mlmd.proto.Context = cmf.merge_created_context( pipeline_stage="Test-env/prepare", @@ -317,7 +325,7 @@ def create_execution( from cmflib.cmf import Cmf from ml_metadata.proto import metadata_store_pb2 as mlpb # Create CMF logger - cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline") + cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline") # Create or reuse context for this stage context: mlmd.proto.Context = cmf.create_context( pipeline_stage="prepare", @@ -358,6 +366,8 @@ def create_execution( assert self.child_context is not None, f"Failed to create context for {self.pipeline_name}!!" # Initializing the execution related fields + + logging_dir = change_dir(self.cmf_init_path) self.metrics = {} self.input_artifacts = [] self.execution_label_props = {} @@ -408,7 +418,7 @@ def create_execution( self.execution.id, custom_props, ) - + os.chdir(logging_dir) return self.execution def update_execution( @@ -423,7 +433,7 @@ def update_execution( from cmflib.cmf import Cmf from ml_metadata.proto import metadata_store_pb2 as mlpb # Create CMF logger - cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline") + cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline") # Update a execution execution: mlmd.proto.Execution = cmf.update_execution( execution_id=8, @@ -505,7 +515,7 @@ def merge_created_execution( from cmflib.cmf import Cmf from ml_metadata.proto import metadata_store_pb2 as mlpb # Create CMF logger - cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline") + cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline") # Create or reuse context for this stage context: mlmd.proto.Context = cmf.merge_created_context( pipeline_stage="prepare", @@ -655,6 +665,7 @@ def log_dataset( # If the dataset already exist , then we just link the existing dataset to the execution # We do not update the dataset properties . # We need to append the new properties to the existing dataset properties + logging_dir = change_dir(self.cmf_init_path) custom_props = {} if custom_properties is None else custom_properties git_repo = git_get_repo() name = re.split("/", url)[-1] @@ -665,6 +676,11 @@ def log_dataset( commit_output(url, self.execution.id) c_hash = dvc_get_hash(url) + + if c_hash == "": + print("Error in getting the dvc hash,return without logging") + return null + dataset_commit = c_hash dvc_url = dvc_get_url(url) dvc_url_with_pipeline = f"{self.parent_context.name}:{dvc_url}" @@ -761,6 +777,7 @@ def log_dataset( self.driver.create_artifact_relationships( self.input_artifacts, child_artifact, self.execution_label_props ) + os.chdir(logging_dir) return artifact def update_dataset_url(self, artifact: mlpb.Artifact, updated_url: str): @@ -1001,7 +1018,7 @@ def log_model( # If the model already exist , then we just link the existing model to the execution # We do not update the model properties . # We need to append the new properties to the existing model properties - + logging_dir = change_dir(self.cmf_init_path) if custom_properties is None: custom_properties = {} custom_props = {} if custom_properties is None else custom_properties @@ -1013,6 +1030,11 @@ def log_model( commit_output(path, self.execution.id) c_hash = dvc_get_hash(path) + + if c_hash == "": + print("Error in getting the dvc hash,return without logging") + return null + model_commit = c_hash # If connecting to an existing artifact - The name of the artifact is @@ -1114,7 +1136,7 @@ def log_model( self.driver.create_artifact_relationships( self.input_artifacts, child_artifact, self.execution_label_props ) - + os.chdir(logging_dir) return artifact # Add the model to dvc do a git commit and store the commit id in MLMD @@ -1357,7 +1379,6 @@ def log_execution_metrics( Returns: Artifact object from ML Metadata library associated with the new coarse-grained metrics artifact. """ - # Assigning current file name as stage and execution name current_script = sys.argv[0] file_name = os.path.basename(current_script) @@ -1373,6 +1394,7 @@ def log_execution_metrics( assert self.execution is not None, f"Failed to create execution for {self.pipeline_name}!!" + logging_dir = change_dir(self.cmf_init_path) custom_props = {} if custom_properties is None else custom_properties uri = str(uuid.uuid1()) metrics_name = metrics_name + ":" + uri + ":" + str(self.execution.id) @@ -1412,6 +1434,7 @@ def log_execution_metrics( self.driver.create_artifact_relationships( self.input_artifacts, child_artifact, self.execution_label_props ) + os.chdir(logging_dir) return metrics def log_metric( @@ -1455,12 +1478,17 @@ def commit_metrics(self, metrics_name: str): Returns: Artifact object from the ML Protocol Buffers library associated with the new metrics artifact. """ + logging_dir = change_dir(self.cmf_init_path) metrics_df = pd.DataFrame.from_dict( self.metrics[metrics_name], orient="index") metrics_df.index.names = ["SequenceNumber"] metrics_df.to_parquet(metrics_name) commit_output(metrics_name, self.execution.id) uri = dvc_get_hash(metrics_name) + + if uri == "": + print("Error in getting the dvc hash,return without logging") + return null metrics_commit = uri name = ( metrics_name @@ -1505,6 +1533,8 @@ def commit_metrics(self, metrics_name: str): self.driver.create_artifact_relationships( self.input_artifacts, child_artifact, self.execution_label_props ) + + os.chdir(logging_dir) return metrics def commit_existing_metrics(self, metrics_name: str, uri: str, custom_properties: t.Optional[t.Dict] = None): @@ -1574,7 +1604,7 @@ def commit_existing_metrics(self, metrics_name: str, uri: str, custom_properties def log_validation_output( self, version: str, custom_properties: t.Optional[t.Dict] = None - ) -> object: + ) -> object: uri = str(uuid.uuid1()) return create_new_artifact_event_and_attribution( store=self.store, @@ -1744,6 +1774,10 @@ def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None: commit_output(self.name, self.writer.execution.id) c_hash = dvc_get_hash(self.name) + if c_hash == "": + print("Error in getting the dvc hash,return without logging") + return null + dataslice_commit = c_hash remote = dvc_get_url(self.name) if c_hash and c_hash.strip(): @@ -1828,7 +1862,7 @@ def commit_existing(self, uri: str, custom_properties: t.Optional[t.Dict] = None # print(last) # os.symlink(str(index), slicedir + "/ " + last) -def metadata_push(pipeline_name,filename,execution_id: str = ""): +def metadata_push(pipeline_name,filepath,execution_id: str = ""): """ Pushes MLMD file to CMF-server. Example: ```python @@ -1836,18 +1870,18 @@ def metadata_push(pipeline_name,filename,execution_id: str = ""): ``` Args: pipeline_name: Name of the pipeline. - filename: Path to the MLMD file. + filepath: Path to the MLMD file. execution_id: Optional execution ID. Returns: Response output from the _metadata_push function. """ - # Required arguments: pipeline_name, filename (mlmd file path) + # Required arguments: pipeline_name, filepath (mlmd file path) #Optional arguments: Execution_ID - output = _metadata_push(pipeline_name,filename, execution_id) + output = _metadata_push(pipeline_name,filepath, execution_id) return output -def metadata_pull(pipeline_name,filename ="./mlmd", execution_id: str = ""): +def metadata_pull(pipeline_name,filepath ="./mlmd", execution_id: str = ""): """ Pulls MLMD file from CMF-server. Example: ```python @@ -1855,17 +1889,17 @@ def metadata_pull(pipeline_name,filename ="./mlmd", execution_id: str = ""): ``` Args: pipeline_name: Name of the pipeline. - filename: File path to store the MLMD file. + filepath: File path to store the MLMD file. execution_id: Optional execution ID. Returns: Message from the _metadata_pull function. """ - # Required arguments: pipeline_name, filename(file path to store mlmd file) + # Required arguments: pipeline_name, filepath(file path to store mlmd file) #Optional arguments: Execution_ID - output = _metadata_pull(pipeline_name,filename, execution_id) + output = _metadata_pull(pipeline_name,filepath, execution_id) return output -def artifact_pull(pipeline_name,filename="./mlmd"): +def artifact_pull(pipeline_name,filepath="./mlmd"): """ Pulls artifacts from the initialized repository. Example: @@ -1875,18 +1909,18 @@ def artifact_pull(pipeline_name,filename="./mlmd"): Args: pipeline_name: Name of the pipeline. - filename: Path to store artifacts. + filepath: Path to store artifacts. Returns: Output from the _artifact_pull function. """ # Required arguments: Pipeline_name - # Optional arguments: filename( path to store artifacts) - output = _artifact_pull(pipeline_name,filename) + # Optional arguments: filepath( path to store artifacts) + output = _artifact_pull(pipeline_name,filepath) return output -def artifact_pull_single(pipeline_name,filename,artifact_name): +def artifact_pull_single(pipeline_name,filepath,artifact_name): """ Pulls a single artifact from the initialized repository. Example: ```python @@ -1894,15 +1928,15 @@ def artifact_pull_single(pipeline_name,filename,artifact_name): ``` Args: pipeline_name: Name of the pipeline. - filename: Path to store the artifact. + filepath: Path to store the artifact. artifact_name: Name of the artifact. Returns: Output from the _artifact_pull_single function. """ # Required arguments: Pipeline_name - # Optional arguments: filename( path to store artifacts) - output = _artifact_pull_single(pipeline_name,filename,artifact_name) + # Optional arguments: filepath( path to store artifacts) + output = _artifact_pull_single(pipeline_name,filepath,artifact_name) return output def artifact_push(): diff --git a/cmflib/cmf_merger.py b/cmflib/cmf_merger.py index 030de6bc..c8519673 100644 --- a/cmflib/cmf_merger.py +++ b/cmflib/cmf_merger.py @@ -37,7 +37,7 @@ def parse_json_to_mlmd(mlmd_json, path_to_store, cmd, exec_id): graph = False if os.getenv('NEO4J_URI', "") != "": graph = True - cmf_class = cmf.Cmf(filename=path_to_store, pipeline_name=pipeline_name, + cmf_class = cmf.Cmf(filepath=path_to_store, pipeline_name=pipeline_name, graph=graph, is_server=True) for stage in data["Pipeline"][0]["stages"]: # Iterates over all the stages if exec_id is None: diff --git a/cmflib/utils/helper_functions.py b/cmflib/utils/helper_functions.py index 3e5012ab..16872721 100644 --- a/cmflib/utils/helper_functions.py +++ b/cmflib/utils/helper_functions.py @@ -54,6 +54,12 @@ def get_python_env()-> str: print("Pip is not installed.") return packages +def change_dir(cmf_init_path): + logging_dir = os.getcwd() + if not logging_dir == cmf_init_path: + os.chdir(cmf_init_path) + return logging_dir + def is_conda_installed(): try: import conda